mac-message - 进阶
自定义消息转换器
框架中预定义了两种消息转换器。JmsMsgConverter和AmqMsgConverter,可自由添加和实现。
JmsMsgConverter 消息转换器用于实现 javax.jms.Message 与 com.boarsoft.message.bean.Message 之间的互转。
ActiveMQ长连接客户端会使用JmsMsgConverter实现两种消息的转换。
public class DemoJmsMsgConverter implements JmsMsgConverter {
@Override
public Message convert(javax.jms.Message am) throws Exception {
String o = ((TextMessage) am).getText();
Message m = JsonUtil.parseObject(o, Message.class);
return m;
}
@Override
public javax.jms.Message convert(Session session, Message m) throws Exception {
// return session.createTextMessage((String) m.getContent());
return session.createTextMessage(m.toJson());
}
}
AmqMsgConverter 消息转换器用于实现 org.apache.activemq.command.Message 与 com.boarsoft.message.bean.Message 之间的互转。
消息中间件的ActiveMQ插件会使用AmqMsgConverter实现两种消息的转换。
public class AmqJsonMsgConverter implements AmqMsgConverter {
private static final Logger log = LoggerFactory.getLogger(AmqJsonMsgConverter.class);
@Override
public com.boarsoft.message.bean.Message convert(Message am) throws Exception {
ActiveMQTextMessage tm = (ActiveMQTextMessage) am;
String s = tm.getText();
log.debug("Received activemq text message: {}", s);
return JsonUtil.parseObject(s, com.boarsoft.message.bean.Message.class);
}
@Override
public Message convert(com.boarsoft.message.bean.Message m) throws Exception {
return null;
}
}
ActiveMQ消息拦截与转换
拦截ActiveMQ消息并转换为内部的消息处理方式,这样就可以实现ActiveMQ与基于RPC的生产者和消费者的互通。
中间件的 activemq.xml
收件人消息签收
消息中间件通过向发件人发送签收回执消息来实现消息的签收。按照常理,一般是由消息发送者负责接收消息签收的回执,因此编写一个发件人来发送消息,并接收回执。
@Component("queue1Producer1")
public class Queue1Producer1 implements MessageService, MessageSender {
private static final Logger log = LoggerFactory.getLogger(Queue1Producer1.class);
@Override
public void put(Message m) throws Exception {
log.info("Received acknowledge: {}", JsonUtil.from(m));
}
@Override
public void send(Message m) throws Exception {
// 设置from作为签收回执消息的消费者
m.setFrom("queue1Producer1");
Messenger.send(m);
}
}
让生产者暴露这个服务,provide.xml
让中间件引用这个服务,consume.xml
发送消息时先设置消息的Acknowledge属性再调它的send方法即可(注意此方法有设置from属性)。
final ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:conf/context.xml");
Message m = new Message();
m.setTarget("queue1");
m.setContent("Hello world!");
m.setId(RandomUtil.genUUID());
m.addRecipient("queue1Consumer1");
// 填写用于接收签收回执的消息队列
m.setAcknowledge("ack1");
Queue1Producer1 qp1 = (Queue1Producer1) ctx.getBean("queue1Producer1");
qp1.send(m);
过滤与分拣插件
用于动态决定每条消息的收件人。实现如下接口即可:
/**
* 返回收件人列表,其中key为收件人ID,value为是否必达
*
* @param m
* @return
*/
public interface RecipientFinder {
Map find(Message m);
}