NutzCN Logo
问答 rabbitMq和nutzwk
发布于 590天前 作者 1037424761 580 次浏览 复制 上一个帖子 下一个帖子
标签:
/**
     * 初始化队列,用于集群部署时的数据更新
     */
    private void initRabbit(NutConfig config, Dao dao) {
        try {
            String queue = R.UU32(), topicQueue = "topicQueue";
            ConnectionFactory factory = config.getIoc().get(ConnectionFactory.class, "rabbitmq_cf");
            log.debug("RabbitMQ:::" + factory.getHost());
            rabbitmq_conn = factory.newConnection();
            rabbitmq_channel = rabbitmq_conn.createChannel();
            rabbitmq_channel.queueDeclare(queue, true, true, false, null);
            rabbitmq_channel.queueDeclare(topicQueue, true, false, false, null);
            rabbitmq_channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC, true);
            rabbitmq_channel.exchangeDeclare("fanoutExchange", BuiltinExchangeType.FANOUT, true);
            rabbitmq_channel.queueBind(queue, "fanoutExchange", "");
            rabbitmq_channel.queueBind(topicQueue, "topicExchange", "topic.#");
            rabbitmq_channel.basicConsume(queue, false, "myConsumerTagFanout",
                    new DefaultConsumer(rabbitmq_channel) {
                        @Override
                        public void handleDelivery(String consumerTag,
                                                   Envelope envelope,
                                                   AMQP.BasicProperties properties,
                                                   byte[] body)
                                throws IOException {
                            String routingKey = envelope.getRoutingKey();
                            String exchange = envelope.getExchange();
                            NutMap params = Lang.fromBytes(body, NutMap.class);
                            log.debug("RabbitMQ fanoutExchange=" + exchange + ",routingKey=" + routingKey + ",params=" + Json.toJson(params));
                            long deliveryTag = envelope.getDeliveryTag();
                            switch (exchange) {
                                case "fanoutExchange"://广播模式,每个消费者都会消费
                                    switch (routingKey) {
                                        case "sysconfig":
                                            Globals.initSysConfig(dao);
                                            break;
                                        case "sysroute":
                                            Globals.initRoute(dao);
                                            break;
                                        case "wxtoken":
                                            Globals.WxMap.clear();
                                            break;
                                        default:
                                            break;
                                    }
                                    break;
                            }
                            // (process the message components here ...)
                            rabbitmq_channel.basicAck(deliveryTag, false);
                        }
                    });
            rabbitmq_channel.basicConsume(topicQueue, false, "myConsumerTagTopic",
                    new DefaultConsumer(rabbitmq_channel) {
                        @Override
                        public void handleDelivery(String consumerTag,
                                                   Envelope envelope,
                                                   AMQP.BasicProperties properties,
                                                   byte[] body)
                                throws IOException {
                            String routingKey = envelope.getRoutingKey();
                            String exchange = envelope.getExchange();
                            NutMap params = Lang.fromBytes(body, NutMap.class);
                            log.debug("RabbitMQ topicExchange=" + exchange + ",routingKey=" + routingKey + ",params=" + Json.toJson(params));
                            long deliveryTag = envelope.getDeliveryTag();
                            switch (exchange) {
                                case "topicExchange"://主题模式,只需一个消费者消费
                                    switch (routingKey) {
                                        case "topic.test.me":
                                            log.debug("topic.test.me.......");
                                            break;
                                    }
                                    break;
                            }
                            // (process the message components here ...)
                            rabbitmq_channel.basicAck(deliveryTag, false);
                        }
                    });
            Globals.RabbitMQEnabled = true;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

在3.5.X版本中加入rabbit,本地没问题,到了测环境老是报错

SEVERE: An unexpected connection driver error occured
java.lang.NoClassDefFoundError: com/rabbitmq/client/impl/ContentHeaderPropertyReader
	at com.rabbitmq.client.AMQP$BasicProperties.<init>(AMQP.java:1660)
	at com.rabbitmq.client.impl.AMQImpl.readContentHeaderFrom(AMQImpl.java:3511)
	at com.rabbitmq.client.impl.CommandAssembler.consumeHeaderFrame(CommandAssembler.java:101)
	at com.rabbitmq.client.impl.CommandAssembler.handleFrame(CommandAssembler.java:158)
	at com.rabbitmq.client.impl.AMQCommand.handleFrame(AMQCommand.java:89)
	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:88)
	at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:634)
	at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.rabbitmq.client.impl.ContentHeaderPropertyReader
	at weblogic.utils.classloaders.GenericClassLoader.findLocalClass(GenericClassLoader.java:297)
	at weblogic.utils.classloaders.GenericClassLoader.findClass(GenericClassLoader.java:270)
	at weblogic.utils.classloaders.ChangeAwareClassLoader.findClass(ChangeAwareClassLoader.java:64)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at weblogic.utils.classloaders.GenericClassLoader.loadClass(GenericClassLoader.java:179)
	at weblogic.utils.classloaders.ChangeAwareClassLoader.loadClass(ChangeAwareClassLoader.java:52)
	... 10 more

Exception in thread "AMQP Connection 10.19.106.244:5672" java.lang.NoClassDefFoundError: com/rabbitmq/client/impl/SocketFrameHandler$1
	at com.rabbitmq.client.impl.SocketFrameHandler.close(SocketFrameHandler.java:185)
	at com.rabbitmq.client.impl.AMQConnection.doFinalShutdown(AMQConnection.java:680)
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:577)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.rabbitmq.client.impl.SocketFrameHandler$1
	at weblogic.utils.classloaders.GenericClassLoader.findLocalClass(GenericClassLoader.java:297)
	at weblogic.utils.classloaders.GenericClassLoader.findClass(GenericClassLoader.java:270)
	at weblogic.utils.classloaders.ChangeAwareClassLoader.findClass(ChangeAwareClassLoader.java:64)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at weblogic.utils.classloaders.GenericClassLoader.loadClass(GenericClassLoader.java:179)
	at weblogic.utils.classloaders.ChangeAwareClassLoader.loadClass(ChangeAwareClassLoader.java:52)
	... 4 more

求大神帮忙看看。。。

9 回复

附源码

public void initMq() {
		try {
			ConnectionFactory factory = Mvcs.ctx().getDefaultIoc().get(ConnectionFactory.class, "rabbitmq_cf");
			Connection conn = factory.newConnection();
			Channel channel = conn.createChannel();
			channel.queueDeclare(RabbitmqQueue.QUEUE_NAME_SHOP_SAVE, true, false, false, null);
			Consumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
						byte[] body) throws IOException {
					String message = new String(body, "UTF-8");
					log.info("Customer Received '" + message + "'");
					save(message);
					getChannel().basicAck(envelope.getDeliveryTag(), false);
				}
			};
			channel.basicConsume(RabbitmqQueue.QUEUE_NAME_SHOP_SAVE, false, consumer);
			
		}
		catch (Exception e) {
			log.debug("rabbitmq load error", e);
		}

	}
var ioc = {
	rabbitmq_cf : {
		type : "com.rabbitmq.client.ConnectionFactory",
		factory : "$conf#make",
		args : ["com.rabbitmq.client.ConnectionFactory", "rabbitmq."]
	},
	rabbitmq : {
		type : "org.nutz.integration.rabbitmq.aop.RabbitmqMethodInterceptor",
		fields : {
			factory : {refer:"rabbitmq_cf"}
		}
	}
};
rabbitmq.host=10.19.105.217
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=****

pom.xml写错了

<!-- rabbitmq -->
		<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.nutz</groupId>
            <artifactId>nutz-integration-rabbitmq</artifactId>
            <version>1.r.62</version>
        </dependency>

关键是本地能用

测试环境是打包成war然后跑吗?

那你本地也打包成war试试嘛

也没问题。。。。
测试的机子比较破,经常出现这个问题,一直测试不通过

扔了买台新的

有道理,我打算先把队列放到spring-boot里面去,把测试报告先弄到手,领导急着上线。
谢了。

添加回复
请先登陆
回到顶部