mac-message - 进阶
自定义消息转换器
  框架中预定义了两种消息转换器。JmsMsgConverter和AmqMsgConverter,可自由添加和实现。
  JmsMsgConverter 消息转换器用于实现 javax.jms.Message 与 com.boarsoft.message.bean.Message 之间的互转。
  ActiveMQ长连接客户端会使用JmsMsgConverter实现两种消息的转换。 
	public class DemoJmsMsgConverter implements JmsMsgConverter {
	
		@Override
		public Message convert(javax.jms.Message am) throws Exception {
			String o = ((TextMessage) am).getText();
			Message m = JsonUtil.parseObject(o, Message.class);
			return m;
		}
	
		@Override
		public javax.jms.Message convert(Session session, Message m) throws Exception {
			// return session.createTextMessage((String) m.getContent());
			return session.createTextMessage(m.toJson());
		}
	}
  AmqMsgConverter 消息转换器用于实现 org.apache.activemq.command.Message 与 com.boarsoft.message.bean.Message 之间的互转。
  消息中间件的ActiveMQ插件会使用AmqMsgConverter实现两种消息的转换。 
	public class AmqJsonMsgConverter implements AmqMsgConverter {
		private static final Logger log = LoggerFactory.getLogger(AmqJsonMsgConverter.class);
	
		@Override
		public com.boarsoft.message.bean.Message convert(Message am) throws Exception {
			ActiveMQTextMessage tm = (ActiveMQTextMessage) am;
			String s = tm.getText();
			log.debug("Received activemq text message: {}", s);
			return JsonUtil.parseObject(s, com.boarsoft.message.bean.Message.class);
		}
	
		@Override
		public Message convert(com.boarsoft.message.bean.Message m) throws Exception {
			return null;
		}
	}
ActiveMQ消息拦截与转换
  拦截ActiveMQ消息并转换为内部的消息处理方式,这样就可以实现ActiveMQ与基于RPC的生产者和消费者的互通。
  中间件的 activemq.xml
	
		
			
			
		
		
			
			
			
				
				
				
					
				
			
		
	
收件人消息签收
  消息中间件通过向发件人发送签收回执消息来实现消息的签收。按照常理,一般是由消息发送者负责接收消息签收的回执,因此编写一个发件人来发送消息,并接收回执。
	@Component("queue1Producer1")
	public class Queue1Producer1 implements MessageService, MessageSender {
		private static final Logger log = LoggerFactory.getLogger(Queue1Producer1.class);
	
		@Override
		public void put(Message m) throws Exception {
			log.info("Received acknowledge: {}", JsonUtil.from(m));
		}
	
		@Override
		public void send(Message m) throws Exception {
			// 设置from作为签收回执消息的消费者
			m.setFrom("queue1Producer1");
			Messenger.send(m);
		}
	}
  让生产者暴露这个服务,provide.xml
	
	
  让中间件引用这个服务,consume.xml
	
	
  发送消息时先设置消息的Acknowledge属性再调它的send方法即可(注意此方法有设置from属性)。
	final ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:conf/context.xml");
	Message m = new Message();
	m.setTarget("queue1");
	m.setContent("Hello world!");
	m.setId(RandomUtil.genUUID());
	m.addRecipient("queue1Consumer1");
	// 填写用于接收签收回执的消息队列
	m.setAcknowledge("ack1");
	Queue1Producer1 qp1 = (Queue1Producer1) ctx.getBean("queue1Producer1");
	qp1.send(m);
过滤与分拣插件
  用于动态决定每条消息的收件人。实现如下接口即可:
	/**
	 * 返回收件人列表,其中key为收件人ID,value为是否必达
	 * 
	 * @param m
	 * @return
	 */
	public interface RecipientFinder {
		Map find(Message m);
	}