NutzCN Logo
问答 rabbitmq消费者中使用redis报错
发布于 99天前 作者 qq_10164104 241 次浏览 复制 上一个帖子 下一个帖子
标签:

代码:

public void receive()
            throws IOException
        {
            chnl.basicQos(1);
            // no auto ack
            boolean autoAck = false;
            chnl.basicConsume(DELAY_QUEUE_NAME, autoAck, new DefaultConsumer(chnl)
            {
                @Aop(Global.REDIS)
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException
                {
                    String str = new String(body, "UTF-8");
                    String[] arr = str.split("\\|");
                    String requestId = arr[0];
                    String rechargeType = arr[1];
                    boolean f = jedis().exists("dc:" + requestId);
                    if (f) // key存在表示未接受到通知,调用充值结果查询接口
                    {
                    String message = new String(body, "UTF-8");
                    // 打印出延时原因 rejected | expired | maxlen
                    // 项目中可以根据原因处理目标消息
                    log.info(String.format("[%s|%s|Delay_Receiver] received the delay msg 【%s】 from EXCHANGE: %s, the delay reason is:%s",
                        Thread.currentThread().getName(),
                        System.currentTimeMillis(),
                        message,
                        envelope.getExchange(),
                        properties.getHeaders().get("x-first-death-reason")));
                    // 确认消息
                    chnl.basicAck(envelope.getDeliveryTag(), false);
                }
            });
        }
    }

报错信息:

java.lang.NullPointerException
	at com.kaike.recharge.module.rabbitmq.DaoChunDelayQueue$DelayEXRecv$1.handleDelivery(DaoChunDelayQueue.java:137)
	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:835)
2 回复

。。。 这样的对象可没法aop

用RedisService可以

添加回复
请先登陆
回到顶部