`

rocketmq开发手册

 
阅读更多
chaojianc添加,由Jonson Xia最后更新于九月 01, 2014  (查看更改)
转至元数据起始 

  

绑定host

使用前必须将jmenv.taobao.net域名绑定到提供nameserver地址的静态服务器地址 
例如,如果本地部署了提供nameserver地址的静态服务,可以这么配置
127.0.0.1 jmenv.taobao.net

目前没有测试机器,,拿我的机器做测试机器,,可以这么绑定
192.168.66.172 jmenv.taobao.net

 

为什么要绑定呢?参考 Rocektmq 部署章节

定义生产者


 

每一类生产者必须定义一个spring bean,,,而且该bean的producerGroup属性必须唯一(不能跟其他类的生产者重复),否则在使用分布式事务的时候会出现问题。

根据官方规范:producerGroup: 一般发送同样消息的Producer,归为同一个Group,应用必须设置,并保证命名唯一

 

producer配置
<bean id="smsproducer" class="com.alibaba.rocketmq.client.producer.DefaultMQProducer" init-method="start" lazy-init="true">
     <property name="producerGroup" value="smsproductgroup" />
</bean>

注意事项

init-method必须定义为start ,否则producer不会启动

 使用生产者

 


调用producer
<bean id="sendmessage" class="com.lifeix.apollo.user.service.impl.SendMessageDemo">
        <property name="producer" ref="smsproducer" />
</bean>

 

 

 

发消息
public class SendMessageDemo {
    private DefaultMQProducer producer;
    public DefaultMQProducer getProducer() {
    return producer;
    }
    public void setProducer(DefaultMQProducer producer) {
    this.producer = producer;
    }
    public void sendMessage() {
    Message msg = new Message("TopicTest1",// topic
            "TagA",// tag
            "OrderID001",// key
            ("Hello MetaQ").getBytes());// body
    SendResult sendResult;
    try {
        sendResult = producer.send(msg);
        System.out.println(sendResult);
        } catch (Exception e) {
        e.printStackTrace();
       }
    }
}

序列化

rocketmq 存储的是二进制数据,序列化,反序列化由使用者自己定义。 我们统一使用阿里推荐的fastjson 序列化对象,,。发送消息的时候把json string通过utf-8转成byte放入message中。 消费的时候,将byte数组转对象

https://github.com/alibaba/fastjson

 

 

pushconsumer 广播模式用法

 

consumer使用之前必须先调用subscribe 订阅topic ,还需要定义消费消息的Listener,,,没有办法全部在spring里面管理,,,

所以,只能显示的new DefaultMQPushConsumer对象,进行消费的操作

使用注意事项

以下代码必须在bean init方法里面使用,,,而且该bean spring配置必须配置成 init-method="init"

 

 

sample
public class ConsumeMessageClusteringDemo {
    protected void init() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
    /**
     * 订阅指定topic下tags分别等于TagA或TagC或TagD
     */
    try {
        consumer.setMessageModel(MessageModel.BROADCASTING); //设置 广播消费模式
        consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
        /**
         * 订阅指定topic下所有消息<br>
         * 注意:一个consumer对象可以订阅多个topic
         */
        consumer.subscribe("TopicTest2", "*");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
        /**
         * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
         */
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
            MessageExt msg = msgs.get(0);
            if (msg.getTopic().equals("TopicTest1")) {
            // 执行TopicTest1的消费逻辑
            if (msg.getTags() != null && msg.getTags().equals("TagA")) {
                // 执行TagA的消费
                String msgbody = new String(msg.getBody());
                System.out.println(msgbody);
            } else if (msg.getTags() != null && msg.getTags().equals("TagC")) {
                // 执行TagC的消费
                String msgbody = new String(msg.getBody());
                System.out.println(msgbody);
            } else if (msg.getTags() != null && msg.getTags().equals("TagD")) {
                // 执行TagD的消费
                String msgbody = new String(msg.getBody());
                System.out.println(msgbody);
            }
            } else if (msg.getTopic().equals("TopicTest2")) {
            // 执行TopicTest2的消费逻辑
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        });
        /**
         * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
         */
        consumer.start();
     } catch (MQClientException e) {
        e.printStackTrace();
     }
    }
}

 

pushconsumer 集群模式用法

pushconsumer 集群消费模式,,,跟广播很相似,,只是setMessageModel 方法传参做点修改即可

  

consumer.setMessageModel(MessageModel.CLUSTERING); //设置 广播消费模式

 

demo代码参考

注:参考 apollo 工程目录结构  ssh://git@pangtong.l99.com:19022/framework/lifeix-demo.git

apollo.user.impl 模块

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics