目录


mac-message - 进阶

自定义消息转换器

  框架中预定义了两种消息转换器。JmsMsgConverter和AmqMsgConverter,可自由添加和实现。
  JmsMsgConverter 消息转换器用于实现 javax.jms.Message 与 com.boarsoft.message.bean.Message 之间的互转。
  ActiveMQ长连接客户端会使用JmsMsgConverter实现两种消息的转换。 public class DemoJmsMsgConverter implements JmsMsgConverter { @Override public Message convert(javax.jms.Message am) throws Exception { String o = ((TextMessage) am).getText(); Message m = JsonUtil.parseObject(o, Message.class); return m; } @Override public javax.jms.Message convert(Session session, Message m) throws Exception { // return session.createTextMessage((String) m.getContent()); return session.createTextMessage(m.toJson()); } }   AmqMsgConverter 消息转换器用于实现 org.apache.activemq.command.Message 与 com.boarsoft.message.bean.Message 之间的互转。
  消息中间件的ActiveMQ插件会使用AmqMsgConverter实现两种消息的转换。 public class AmqJsonMsgConverter implements AmqMsgConverter { private static final Logger log = LoggerFactory.getLogger(AmqJsonMsgConverter.class); @Override public com.boarsoft.message.bean.Message convert(Message am) throws Exception { ActiveMQTextMessage tm = (ActiveMQTextMessage) am; String s = tm.getText(); log.debug("Received activemq text message: {}", s); return JsonUtil.parseObject(s, com.boarsoft.message.bean.Message.class); } @Override public Message convert(com.boarsoft.message.bean.Message m) throws Exception { return null; } }

ActiveMQ消息拦截与转换

  拦截ActiveMQ消息并转换为内部的消息处理方式,这样就可以实现ActiveMQ与基于RPC的生产者和消费者的互通。
  中间件的 activemq.xml
<broker xmlns="http://activemq.apache.org/schema/core" useJmx="false"> <transportConnectors> <transportConnector uri="amqp://0.0.0.0:5672" /> <transportConnector uri="tcp://0.0.0.0:61616" /> </transportConnectors> <plugins> <discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" /> <!-- ActiveMQ消息拦截器 --> <bean xmlns="http://www.springframework.org/schema/beans" class="com.boarsoft.message.amq.plugin.InterceptorPlugin"> <property name="localMsgSvc1" ref="localMsgSvc1" /> <!-- 消息转换器列表,用于将ActiveMQ消息转换为mac-message中定义的消息对象 --> <property name="converterMap"> <map> <!-- Key为ActiveMQ中的队列名 --> <entry key="queue1"> <bean class="com.boarsoft.boar.message.demo.AmqJsonMsgConverter" /> </entry> </map> </property> </bean> </plugins> </broker>

收件人消息签收

  消息中间件通过向发件人发送签收回执消息来实现消息的签收。按照常理,一般是由消息发送者负责接收消息签收的回执,因此编写一个发件人来发送消息,并接收回执。 @Component("queue1Producer1") public class Queue1Producer1 implements MessageService, MessageSender { private static final Logger log = LoggerFactory.getLogger(Queue1Producer1.class); @Override public void put(Message m) throws Exception { log.info("Received acknowledge: {}", JsonUtil.from(m)); } @Override public void send(Message m) throws Exception { // 设置from作为签收回执消息的消费者 m.setFrom("queue1Producer1"); Messenger.send(m); } }   让生产者暴露这个服务,provide.xml <service ref="queue1Producer1" group="message" name="queue1Producer1" interface="com.boarsoft.message.core.MessageService" version="1.0.0" timeout="12000"> </service>   让中间件引用这个服务,consume.xml <reference id="queue1Producer1" group="message" name="queue1Producer1" interface="com.boarsoft.message.core.MessageService" version="1.0.0" timeout="12000"> </reference>   发送消息时先设置消息的Acknowledge属性再调它的send方法即可(注意此方法有设置from属性)。 final ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:conf/context.xml"); Message m = new Message(); m.setTarget("queue1"); m.setContent("Hello world!"); m.setId(RandomUtil.genUUID()); m.addRecipient("queue1Consumer1"); // 填写用于接收签收回执的消息队列 m.setAcknowledge("ack1"); Queue1Producer1 qp1 = (Queue1Producer1) ctx.getBean("queue1Producer1"); qp1.send(m);

过滤与分拣插件

  用于动态决定每条消息的收件人。实现如下接口即可: /** * 返回收件人列表,其中key为收件人ID,value为是否必达 * * @param m * @return */ public interface RecipientFinder { Map<String, Boolean> find(Message m); }