代码:
public void receive()
throws IOException
{
chnl.basicQos(1);
// no auto ack
boolean autoAck = false;
chnl.basicConsume(DELAY_QUEUE_NAME, autoAck, new DefaultConsumer(chnl)
{
@Aop(Global.REDIS)
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException
{
String str = new String(body, "UTF-8");
String[] arr = str.split("\\|");
String requestId = arr[0];
String rechargeType = arr[1];
boolean f = jedis().exists("dc:" + requestId);
if (f) // key存在表示未接受到通知,调用充值结果查询接口
{
String message = new String(body, "UTF-8");
// 打印出延时原因 rejected | expired | maxlen
// 项目中可以根据原因处理目标消息
log.info(String.format("[%s|%s|Delay_Receiver] received the delay msg 【%s】 from EXCHANGE: %s, the delay reason is:%s",
Thread.currentThread().getName(),
System.currentTimeMillis(),
message,
envelope.getExchange(),
properties.getHeaders().get("x-first-death-reason")));
// 确认消息
chnl.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
报错信息:
java.lang.NullPointerException
at com.kaike.recharge.module.rabbitmq.DaoChunDelayQueue$DelayEXRecv$1.handleDelivery(DaoChunDelayQueue.java:137)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:835)