/**
* 初始化队列,用于集群部署时的数据更新
*/
private void initRabbit(NutConfig config, Dao dao) {
try {
String queue = R.UU32(), topicQueue = "topicQueue";
ConnectionFactory factory = config.getIoc().get(ConnectionFactory.class, "rabbitmq_cf");
log.debug("RabbitMQ:::" + factory.getHost());
rabbitmq_conn = factory.newConnection();
rabbitmq_channel = rabbitmq_conn.createChannel();
rabbitmq_channel.queueDeclare(queue, true, true, false, null);
rabbitmq_channel.queueDeclare(topicQueue, true, false, false, null);
rabbitmq_channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC, true);
rabbitmq_channel.exchangeDeclare("fanoutExchange", BuiltinExchangeType.FANOUT, true);
rabbitmq_channel.queueBind(queue, "fanoutExchange", "");
rabbitmq_channel.queueBind(topicQueue, "topicExchange", "topic.#");
rabbitmq_channel.basicConsume(queue, false, "myConsumerTagFanout",
new DefaultConsumer(rabbitmq_channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
NutMap params = Lang.fromBytes(body, NutMap.class);
log.debug("RabbitMQ fanoutExchange=" + exchange + ",routingKey=" + routingKey + ",params=" + Json.toJson(params));
long deliveryTag = envelope.getDeliveryTag();
switch (exchange) {
case "fanoutExchange"://广播模式,每个消费者都会消费
switch (routingKey) {
case "sysconfig":
Globals.initSysConfig(dao);
break;
case "sysroute":
Globals.initRoute(dao);
break;
case "wxtoken":
Globals.WxMap.clear();
break;
default:
break;
}
break;
}
// (process the message components here ...)
rabbitmq_channel.basicAck(deliveryTag, false);
}
});
rabbitmq_channel.basicConsume(topicQueue, false, "myConsumerTagTopic",
new DefaultConsumer(rabbitmq_channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
NutMap params = Lang.fromBytes(body, NutMap.class);
log.debug("RabbitMQ topicExchange=" + exchange + ",routingKey=" + routingKey + ",params=" + Json.toJson(params));
long deliveryTag = envelope.getDeliveryTag();
switch (exchange) {
case "topicExchange"://主题模式,只需一个消费者消费
switch (routingKey) {
case "topic.test.me":
log.debug("topic.test.me.......");
break;
}
break;
}
// (process the message components here ...)
rabbitmq_channel.basicAck(deliveryTag, false);
}
});
Globals.RabbitMQEnabled = true;
} catch (Exception e) {
e.printStackTrace();
}
}
在3.5.X版本中加入rabbit,本地没问题,到了测环境老是报错
SEVERE: An unexpected connection driver error occured
java.lang.NoClassDefFoundError: com/rabbitmq/client/impl/ContentHeaderPropertyReader
at com.rabbitmq.client.AMQP$BasicProperties.<init>(AMQP.java:1660)
at com.rabbitmq.client.impl.AMQImpl.readContentHeaderFrom(AMQImpl.java:3511)
at com.rabbitmq.client.impl.CommandAssembler.consumeHeaderFrame(CommandAssembler.java:101)
at com.rabbitmq.client.impl.CommandAssembler.handleFrame(CommandAssembler.java:158)
at com.rabbitmq.client.impl.AMQCommand.handleFrame(AMQCommand.java:89)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:88)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:634)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.rabbitmq.client.impl.ContentHeaderPropertyReader
at weblogic.utils.classloaders.GenericClassLoader.findLocalClass(GenericClassLoader.java:297)
at weblogic.utils.classloaders.GenericClassLoader.findClass(GenericClassLoader.java:270)
at weblogic.utils.classloaders.ChangeAwareClassLoader.findClass(ChangeAwareClassLoader.java:64)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at weblogic.utils.classloaders.GenericClassLoader.loadClass(GenericClassLoader.java:179)
at weblogic.utils.classloaders.ChangeAwareClassLoader.loadClass(ChangeAwareClassLoader.java:52)
... 10 more
Exception in thread "AMQP Connection 10.19.106.244:5672" java.lang.NoClassDefFoundError: com/rabbitmq/client/impl/SocketFrameHandler$1
at com.rabbitmq.client.impl.SocketFrameHandler.close(SocketFrameHandler.java:185)
at com.rabbitmq.client.impl.AMQConnection.doFinalShutdown(AMQConnection.java:680)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:577)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.rabbitmq.client.impl.SocketFrameHandler$1
at weblogic.utils.classloaders.GenericClassLoader.findLocalClass(GenericClassLoader.java:297)
at weblogic.utils.classloaders.GenericClassLoader.findClass(GenericClassLoader.java:270)
at weblogic.utils.classloaders.ChangeAwareClassLoader.findClass(ChangeAwareClassLoader.java:64)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at weblogic.utils.classloaders.GenericClassLoader.loadClass(GenericClassLoader.java:179)
at weblogic.utils.classloaders.ChangeAwareClassLoader.loadClass(ChangeAwareClassLoader.java:52)
... 4 more
求大神帮忙看看。。。