NutzCN Logo
问答 nutz加入消息队列成功后,监听的方法怎么定义
发布于 1711天前 作者 hryc 1557 次浏览 复制 上一个帖子 下一个帖子
标签:
@Aop("rabbitmq") // 会自动管理Connection/Channel的开启和关闭.
    public void publish(String routingKey, byte[] body) throws Exception {
        channel().queueDeclare(routingKey, false, false, false, null);
        channel().basicPublish("", routingKey, null, body);
        logger.info("加入消息队列成功routingKey:"+routingKey);
    }

监听接收消息的方法怎么定义;Demo里面好像没有示例

13 回复

看看nutzwk里面的用法

我之前是在 nutzBoot里面找的 我看github和码云上nutzwk没有关于 rabbitmq的使用代码
https://github.com/Wizzercn/NutzWk/tree/v5.x/wk-app

写了个这个 执行这个方法后 但是消费没执行过

/**
     * 持续监听队列以接收数据
     * @throws IOException
     * @throws TimeoutException
     */
    @Aop("rabbitmq")
    public void receiveMessage() throws IOException, TimeoutException {
        logger.info("进入receiveMessage监听方法");
        // 每次缓存5个消息在本地
        channel().basicQos(5);
        channel().basicConsume("questionnaireUser", true, new DefaultConsumer(channel()) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                logger.info("receiveMessage开始监听消费:"+message);
                JSONObject json = JSON.parseObject(message);
                logger.debug("questionnaireUser" + " Received '" + message + "'" + ", routingKey: " + envelope.getRoutingKey());
                // 处理接收到的消息
                switch (json.getString("exchange")) {
          /*  case "fanoutExchange":
                switch (json.getString("routeKey")) {
                    case "sysconfig":
                        Globals.initSysConfig(dao);
                        break;
                    case "sysroute":
                        Globals.initRoute(dao);
                        break;
                    case "wxtoken":
                        Globals.WxMap.clear();
                        break;
                    case "top.order.create":
                        log.debug("下单的广播的消息参数:" + json.getJSONObject("params"));
                        break;
                    default:
                        break;
                }
                break;*/
                    case "topicExchange":
                        switch (json.getString("routeKey")) {
                            case "examSave":
                                saveOrSubmit(json);
                                break;
                            case "examSubmit":
                                saveOrSubmit(json);
                                break;
                            case "questionnaireUser":
                                //调研提交
                                saveSubmitQuestionnaire(json);
                                break;
                            case "trainClassQuestionnaire":
                                saveSubmitTrainClassQuestionnaire(json);
                                break;
                        }
                        break;

                }
                // 持续监听
                channel().basicConsume("questionnaireUser", true, this);
                channel().basicAck(envelope.getDeliveryTag(), true);
            }
        });
    }

public void init() throws IOException, TimeoutException {
Mvcs.X_POWERED_BY = "nutzwk 5.2.x <wizzer.cn>";
Globals.AppBase = Mvcs.getServletContext().getContextPath();
Globals.AppRoot = Mvcs.getServletContext().getRealPath("/");
//注册自定义标签
groupTemplate.registerTagFactory("cms_channel_list", () -> ioc.get(CmsChannelListTag.class));
groupTemplate.registerTagFactory("cms_channel", () -> ioc.get(CmsChannelTag.class));
groupTemplate.registerTagFactory("cms_article_list", () -> ioc.get(CmsArticleListTag.class));
groupTemplate.registerTagFactory("cms_article", () -> ioc.get(CmsArticleTag.class));
groupTemplate.registerTagFactory("cms_link_list", () -> ioc.get(CmsLinkListTag.class));
//MQ服务
try {
String routingKey = "questionnaireUser";
ioc.get(RabbitTestService.class).publish(routingKey, "Hello Word********".getBytes());
ioc.get(RabbitTestService.class).receiveMessage();
// ioc.get(RabbitTestService.class).get(routingKey);

    }catch (Exception e){
        e.printStackTrace();
    }
}

启动的时候执行了下
@wendal

不晓得, 问问大鲨鱼吧

搜 rabbitmq 相关文档,都是标准用法,对照下哪里不对

[DEBUG] 13:56:33.120 [main] org.nutz.ioc.impl.NutIoc - Get 'dubbo_iobjs'<>
[DEBUG] 13:56:33.120 [main] org.nutz.ioc.impl.NutIoc - Get 'dubboApplicationConfig'<class com.alibaba.dubbo.config.ApplicationConfig>
[DEBUG] 13:56:33.121 [main] org.nutz.ioc.impl.NutIoc - Get 'dubboRegistryConfig'<class com.alibaba.dubbo.config.RegistryConfig>
[DEBUG] 13:56:33.202 [main-SendThread(120.92.153.163:2181)] org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x170042d433204b0, packet:: clientPath:null serverPath:null finished:false header:: 81,3  replyHeader:: 81,34214,0  request:: '/dubbo/cn.wizzer.app.sys.modules.services.SysRouteService/consumers,F  response:: s{404,404,1569224829144,1569224829144,0,912,0,0,10,0,34202} 
[DEBUG] 13:56:33.216 [main-SendThread(120.92.153.163:2181)] org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x170042d433204b0, packet:: clientPath:null serverPath:null finished:false header:: 82,1  replyHeader:: 82,34215,0  request:: '/dubbo/cn.wizzer.app.sys.modules.services.SysRouteService/consumers/consumer%253A%252F%252F192.168.175.1%252Fcn.wizzer.app.sys.modules.services.SysRouteService%253Fapplication%253Dwk-nb-web-vue%2526category%253Dconsumers%2526check%253Dfalse%2526default.retries%253D0%2526default.timeout%253D300000%2526dubbo%253D2.0.2%2526interface%253Dcn.wizzer.app.sys.modules.services.SysRouteService%2526methods%253DupdateWithVersion%252Cdata%252CinsertRelation%252CinsertWith%252CinsertOrUpdate%252Cdao%252CgetMaxId%252CgetSubPath%252Cquery%252Ccount%252CgetField%252ClistEntity%252CvDelete%252Clist%252Cexecute%252Cfetchx%252CgetMap%252CupdateAndIncrIfMatch%252CupdateLinks%252CgetEntityClass%252CfetchLinks%252CupdateRelation%252CfastInsert%252Cupdate%252Cinsert%252CinsertLinks%252CgetParentPath%252Cdelete%252CupdateIgnoreNull%252ClistPageLinks%252Cclear%252CgetEntity%252CupdateWith%252Cfetch%252CgetNutMap%252Cexists%252ClistPageMap%252ClistPage%2526pid%253D29848%2526qos.enable%253Dfalse%2526side%253Dconsumer%2526timestamp%253D1583819793128,#3139322e3136382e3137352e31,v{s{31,s{'world,'anyone}}},1  response:: '/dubbo/cn.wizzer.app.sys.modules.services.SysRouteService/consumers/consumer%253A%252F%252F192.168.175.1%252Fcn.wizzer.app.sys.modules.services.SysRouteService%253Fapplication%253Dwk-nb-web-vue%2526category%253Dconsumers%2526check%253Dfalse%2526default.retries%253D0%2526default.timeout%253D300000%2526dubbo%253D2.0.2%2526interface%253Dcn.wizzer.app.sys.modules.services.SysRouteService%2526methods%253DupdateWithVersion%252Cdata%252CinsertRelation%252CinsertWith%252CinsertOrUpdate%252Cdao%252CgetMaxId%252CgetSubPath%252Cquery%252Ccount%252CgetField%252ClistEntity%252CvDelete%252Clist%252Cexecute%252Cfetchx%252CgetMap%252CupdateAndIncrIfMatch%252CupdateLinks%252CgetEntityClass%252CfetchLinks%252CupdateRelation%252CfastInsert%252Cupdate%252Cinsert%252CinsertLinks%252CgetParentPath%252Cdelete%252CupdateIgnoreNull%252ClistPageLinks%252Cclear%252CgetEntity%252CupdateWith%252Cfetch%252CgetNutMap%252Cexists%252ClistPageMap%252ClistPage%2526pid%253D29848%2526qos.enable%253Dfalse%2526side%253Dconsumer%2526timestamp%253D1583819793128 
[DEBUG] 13:56:33.228 [main-SendThread(120.92.153.163:2181)] org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x170042d433204b0, packet:: clientPath:null serverPath:null finished:false header:: 83,3  replyHeader:: 83,34215,0  request:: '/dubbo/cn.wizzer.app.sys.modules.services.SysRouteService/providers,F  response:: s{32,32,1569056026939,1569056026939,0,797,0,0,10,1,33910} 
[DEBUG] 13:56:33.242 [main-SendThread(120.92.153.163:2181)] org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x170042d433204b0, packet:: clientPath:null serverPath:null finished:false header:: 84,12  replyHeader:: 84,34215,0  request:: '/dubbo/cn.wizzer.app.sys.modules.services.SysRouteService/providers,T  response:: v{'dubbo%253A%252F%252F192.168.175.1%253A17580%252Fcn.wizzer.app.sys.modules.services.SysRouteService%253Fanyhost%253Dtrue%2526application%253Dwk-nb-service-sys%2526dubbo%253D2.0.2%2526generic%253Dfalse%2526interface%253Dcn.wizzer.app.sys.modules.services.SysRouteService%2526methods%253DupdateWithVersion%252Cdata%252CinsertRelation%252CinsertWith%252CinsertOrUpdate%252Cdao%252CgetMaxId%252CgetSubPath%252Cquery%252Ccount%252CgetField%252CvDelete%252ClistEntity%252Clist%252Cexecute%252Cfetchx%252CgetMap%252CupdateAndIncrIfMatch%252CupdateLinks%252CgetEntityClass%252CfetchLinks%252CupdateRelation%252CfastInsert%252Cupdate%252Cinsert%252CinsertLinks%252CgetParentPath%252Cdelete%252CupdateIgnoreNull%252ClistPageLinks%252Cclear%252CgetEntity%252CupdateWith%252Cfetch%252CgetNutMap%252Cexists%252ClistPageMap%252ClistPage%2526pid%253D20328%2526side%253Dprovider%2526threads%253D200%2526timestamp%253D1583807961064},s{32,32,1569056026939,1569056026939,0,797,0,0,10,1,33910} 
[DEBUG] 13:56:33.255 [main-SendThread(120.92.153.163:2181)] org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x170042d433204b0, packet:: clientPath:null serverPath:null finished:false header:: 85,3  replyHeader:: 85,34215,0  request:: '/dubbo/cn.wizzer.app.sys.modules.services.SysRouteService/configurators,F  response:: s{34,34,1569056027003,1569056027003,0,0,0,0,10,0,34} 
[DEBUG] 13:56:33.268 [main-SendThread(120.92.153.163:2181)] org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x170042d433204b0, packet:: clientPath:null serverPath:null finished:false header:: 86,12  replyHeader:: 86,34215,0  request:: '/dubbo/cn.wizzer.app.sys.modules.services.SysRouteService/configurators,T  response:: v{},s{34,34,1569056027003,1569056027003,0,0,0,0,10,0,34} 
[DEBUG] 13:56:33.285 [main-SendThread(120.92.153.163:2181)] org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x170042d433204b0, packet:: clientPath:null serverPath:null finished:false header:: 87,3  replyHeader:: 87,34215,0  request:: '/dubbo/cn.wizzer.app.sys.modules.services.SysRouteService/routers,F  response:: s{406,406,1569224829265,1569224829265,0,0,0,0,10,0,406} 
[DEBUG] 13:56:33.298 [main-SendThread(120.92.153.163:2181)] org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x170042d433204b0, packet:: clientPath:null serverPath:null finished:false header:: 88,12  replyHeader:: 88,34215,0  request:: '/dubbo/cn.wizzer.app.sys.modules.services.SysRouteService/routers,T  response:: v{},s{406,406,1569224829265,1569224829265,0,0,0,0,10,0,406} 
[INFO ] 13:56:33.407 [main] io.seata.common.loader.EnhancedServiceLoader - load ContextCore[null] extension by class[io.seata.core.context.ThreadLocalContextCore]
[DEBUG] 13:56:33.407 [main] io.seata.integration.dubbo.alibaba.TransactionPropagationFilter - xid in RootContext[null] xid in RpcContext[null]
[DEBUG] 13:56:33.495 [main] io.seata.integration.dubbo.alibaba.TransactionPropagationFilter - xid in RootContext[null] xid in RpcContext[null]
[DEBUG] 13:56:33.520 [main] org.nutz.ioc.impl.NutIoc - Get 'webPubSub'<class cn.wizzer.app.web.commons.ext.pubsub.WebPubSub>
[DEBUG] 13:56:33.520 [main] org.nutz.ioc.impl.NutIoc - 	 >> Load definition name=webPubSub
[DEBUG] 13:56:33.520 [main] org.nutz.ioc.loader.combo.ComboIocLoader - Found IocObject(webPubSub) in AnnotationIocLoader(packages=[cn.wizzer])
[DEBUG] 13:56:33.520 [main] org.nutz.ioc.impl.NutIoc - 	 >> Make...'webPubSub'<class cn.wizzer.app.web.commons.ext.pubsub.WebPubSub>
[DEBUG] 13:56:33.520 [main] org.nutz.ioc.impl.ScopeContext - Save object 'webPubSub' to [app] 
[DEBUG] 13:56:33.520 [main] org.nutz.ioc.aop.impl.DefaultMirrorFactory - Load class cn.wizzer.app.web.commons.ext.pubsub.WebPubSub without AOP
[DEBUG] 13:56:33.522 [main] org.nutz.ioc.impl.NutIoc - Get 'pubSubService'<class org.nutz.integration.jedis.pubsub.PubSubService>
[DEBUG] 13:56:33.522 [main] org.nutz.ioc.val.ReferTypeValue - name=sysConfigService not found, search for type=cn.wizzer.app.sys.modules.services.SysConfigService
[DEBUG] 13:56:33.522 [main] org.nutz.ioc.impl.NutIoc - Get 'uhr8r221oqg3ird7ke0vda6vch'<interface cn.wizzer.app.sys.modules.services.SysConfigService>
[DEBUG] 13:56:33.522 [main] org.nutz.ioc.val.ReferTypeValue - name=sysRouteService not found, search for type=cn.wizzer.app.sys.modules.services.SysRouteService
[DEBUG] 13:56:33.523 [main] org.nutz.ioc.impl.NutIoc - Get 'rj70qo7qr4iuqoahpht8hf3bib'<interface cn.wizzer.app.sys.modules.services.SysRouteService>
[DEBUG] 13:56:33.523 [main] org.nutz.ioc.impl.NutIoc - Get 'conf'<class org.nutz.ioc.impl.PropertiesProxy>
[DEBUG] 13:56:33.523 [main] org.nutz.ioc.impl.NutIoc - Get 'groupTemplate'<class org.beetl.core.GroupTemplate>
[DEBUG] 13:56:33.524 [main] org.nutz.ioc.impl.NutIoc - Get 'rabbitMqService'<class cn.wizzer.app.web.commons.rabbit.RabbitMqService>
[DEBUG] 13:56:33.524 [main] org.nutz.ioc.impl.NutIoc - 	 >> Load definition name=rabbitMqService
[DEBUG] 13:56:33.524 [main] org.nutz.ioc.loader.combo.ComboIocLoader - Found IocObject(rabbitMqService) in AnnotationIocLoader(packages=[cn.wizzer])
[DEBUG] 13:56:33.524 [main] org.nutz.ioc.impl.NutIoc - 	 >> Make...'rabbitMqService'<class cn.wizzer.app.web.commons.rabbit.RabbitMqService>
[DEBUG] 13:56:33.524 [main] org.nutz.ioc.impl.ScopeContext - Save object 'rabbitMqService' to [app] 
[DEBUG] 13:56:33.524 [main] org.nutz.ioc.impl.NutIoc - Get 'rabbitmq'<interface org.nutz.aop.MethodInterceptor>
[DEBUG] 13:56:33.524 [main] org.nutz.ioc.impl.NutIoc - 	 >> Load definition name=rabbitmq
[DEBUG] 13:56:33.524 [main] org.nutz.ioc.loader.combo.ComboIocLoader - Found IocObject(rabbitmq) in AnnotationIocLoader(packages=[org.nutz.integration.rabbitmq])
[DEBUG] 13:56:33.524 [main] org.nutz.ioc.impl.NutIoc - 	 >> Make...'rabbitmq'<interface org.nutz.aop.MethodInterceptor>
[DEBUG] 13:56:33.524 [main] org.nutz.ioc.impl.ScopeContext - Save object 'rabbitmq' to [app] 
[DEBUG] 13:56:33.526 [main] org.nutz.ioc.impl.NutIoc - Get 'rabbitmq_cf'<>
[DEBUG] 13:56:33.526 [main] org.nutz.ioc.impl.NutIoc - 	 >> Load definition name=rabbitmq_cf
[DEBUG] 13:56:33.526 [main] org.nutz.ioc.loader.combo.ComboIocLoader - Found IocObject(rabbitmq_cf) in AnnotationIocLoader(packages=[org.nutz.integration.rabbitmq])
[DEBUG] 13:56:33.526 [main] org.nutz.ioc.impl.NutIoc - 	 >> Make...'rabbitmq_cf'<>
[DEBUG] 13:56:33.526 [main] org.nutz.ioc.impl.ScopeContext - Save object 'rabbitmq_cf' to [app] 
[DEBUG] 13:56:33.533 [main] org.nutz.ioc.aop.impl.DefaultMirrorFactory - Load class com.rabbitmq.client.ConnectionFactory without AOP
[DEBUG] 13:56:33.533 [main] org.nutz.ioc.impl.NutIoc - Get 'rabbitMqBeans'<>
[DEBUG] 13:56:33.533 [main] org.nutz.ioc.impl.NutIoc - 	 >> Load definition name=rabbitMqBeans
[DEBUG] 13:56:33.533 [main] org.nutz.ioc.loader.combo.ComboIocLoader - Found IocObject(rabbitMqBeans) in AnnotationIocLoader(packages=[org.nutz.integration.rabbitmq])
[DEBUG] 13:56:33.533 [main] org.nutz.ioc.impl.NutIoc - 	 >> Make...'rabbitMqBeans'<>
[DEBUG] 13:56:33.533 [main] org.nutz.ioc.impl.ScopeContext - Save object 'rabbitMqBeans' to [app] 
[DEBUG] 13:56:33.533 [main] org.nutz.ioc.aop.impl.DefaultMirrorFactory - Load class org.nutz.integration.rabbitmq.RabbitMqBeans without AOP
[DEBUG] 13:56:33.535 [main] org.nutz.ioc.impl.NutIoc - Get 'conf'<class org.nutz.ioc.impl.PropertiesProxy>
[DEBUG] 13:56:33.579 [main] org.nutz.ioc.impl.NutIoc - Get 'rabbitmq'<interface org.nutz.aop.MethodInterceptor>
[DEBUG] 13:56:33.579 [main] org.nutz.ioc.impl.NutIoc - Get 'rabbitmq'<interface org.nutz.aop.MethodInterceptor>
com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.
	at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:351)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:64)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:948)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:907)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:865)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1018)
	at org.nutz.integration.rabbitmq.aop.RabbitmqMethodInterceptor.filter(RabbitmqMethodInterceptor.java:29)
	at org.nutz.aop.InterceptorChain.doChain(InterceptorChain.java:60)
	at cn.wizzer.app.web.commons.rabbit.RabbitMqService$$NUTZAOP.recv(RabbitMqService.java:1)
	at cn.wizzer.app.web.commons.core.WebPlatformMainLauncher.init(WebPlatformMainLauncher.java:78)
	at cn.wizzer.app.web.commons.core.WebPlatformMainLauncher$FM$init$cff301be46c87e2fbea60f4ef344bacb.invoke(WebPlatformMainLauncher.java)
	at org.nutz.ioc.impl.ObjectMakerImpl$2.trigger(ObjectMakerImpl.java:181)
	at org.nutz.ioc.weaver.DefaultWeaver.onCreate(DefaultWeaver.java:89)
	at org.nutz.ioc.impl.ObjectMakerImpl.make(ObjectMakerImpl.java:141)
	at org.nutz.ioc.impl.NutIoc.get(NutIoc.java:241)
	at org.nutz.ioc.impl.NutIoc.get(NutIoc.java:271)
	at org.nutz.ioc.impl.NutIoc.get(NutIoc.java:161)
	at org.nutz.boot.NbApp.execute(NbApp.java:214)
	at org.nutz.boot.NbApp.run(NbApp.java:182)
	at cn.wizzer.app.web.commons.core.WebPlatformMainLauncher.main(WebPlatformMainLauncher.java:52)
[INFO ] 13:56:33.796 [main] org.nutz.boot.NbApp - wk-nb-web-vue started : 20278ms
[ERROR] 13:56:33.797 [AMQP Connection 120.92.153.163:5672] com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected connection driver error occured
java.net.SocketException: Socket Closed
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
	at java.net.SocketInputStream.read(SocketInputStream.java:171)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
	at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91)
	at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164)
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:580)
	at java.lang.Thread.run(Thread.java:748)

application.properties里面:

rabbitmq.host=120.92.153.163
rabbitmq.port=5672
rabbitmq.username = *
rabbitmq.password = *

pom版本:

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.nutz</groupId>
            <artifactId>nutzboot-starter-rabbitmq</artifactId>
            <version>2.3.8-SNAPSHOT</version>
        </dependency>

@wendal @Wizzercn 单独用Java写了个dome是可以的 放在nutz里面老是集成不进来

AuthenticationFailureException

账号密码错误

果然 配置文件里账号密码有空格 可以了 感谢
@wendal

MQ发送消息方法

    /**
     * 生产者,模拟为商品服务
     * @throws Exception
     */
//    @Aop("rabbitmq") // 会自动管理Connection/Channel的开启和关闭.
    public void send() throws Exception {
        //获取到连接
        Connection connection = ConnectionUtil.getConnection(conf);
        //获取通道
        Channel channel = connection.createChannel();
        //声明交换机(exchange),指定消息类型为topic
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //消息内容
        String message = "新增商品:id=001";
        //发送消息,并指定routing key 为:insert,代表新增商品
        channel.basicPublish(EXCHANGE_NAME,"item.insert",null,message.getBytes());
        System.out.println("[商品服务:]Sent'"+message+"'");
        //关闭通道
        channel.close();
        //关闭连接
        connection.close();
    }

MQ启用消费监听方法--在nutz项目启动后初始化执行

public void rabbitInit() throws Exception {


            //获取连接
            Connection connection = ConnectionUtil.getConnection(conf);
            //创建通道
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //绑定队列到交换机,同时指定需要订阅的routing key
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"item.update");
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"item.delete");
            //定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel){
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body)throws IOException{
                    //body:消息体,存储消息
                    String message = new String(body);
                    System.out.println("[消费者1]received:"+message+"!");
                }
            };
            //监听队列,自动ACK(确认)
            channel.basicConsume(QUEUE_NAME,true,consumer);

    }

同样的代码 在自己写的demo 和 nutz项目里面能发送消息成功也能监听成功

在nutz里面监听成功了回调也执行了没问题
可是我在监测rabbitMq控制台发现
Overview--->下的
Ready 和 Total 数量会增加不会减少
执行一次发消息变数量就加一次--集成在nutz里面就会出现
自己单独的demo不会有这个情况
用的nutzWk
@wendal @Wizzercn

不是nutz的问题 ,是我自己测试的时候 开了两个消费 然后又把那个消费停掉了 导致的问题

添加回复
请先登陆
回到顶部