目录


mac-message - 示例

点击下载 消息应用示例简化版消息中间件简化版消息管控依赖JAR包与源码下载

基于RPC的收发

  生产者:queues.xml <bean id="targetQueueMap" class="java.util.HashMap"> <constructor-arg> <map> <entry key="queue1"> <bean class="com.boarsoft.message.bean.TargetMQ"> <property name="catalog" value="demo" /> <property name="code" value="queue1" /> </bean> </entry> </map> </constructor-arg> </bean>   生产者:consume.xml <reference id="serverMsgSvc" group="message" name="server" interface="com.boarsoft.message.core.MessageService" version="1.0.0" timeout="3000"> </reference>   生产者: RpcPubTest.java Message m = new Message(); m.setTarget("queue1"); m.setContent("Hello world!"); m.setId(RandomUtil.genUUID()); m.addRecipient("queue1Consumer1"); log.info("Send message {}", m); // 此时,注入给Messenger的是localMsgSvc1则不是amqClient1 Messenger.send(m);   中间件:queues.xml <bean id="targetQueueMap" class="java.util.HashMap"> <constructor-arg> <map> <entry key="queue1"> <bean class="com.boarsoft.message.bean.TargetMQ"> <property name="catalog" value="demo" /> <property name="code" value="queue1" /> </bean> </entry> </map> </constructor-arg> </bean>   中间件:consumer.xml <reference id="queue1Consumer1" group="message" name="queue1Consumer1" interface="com.boarsoft.message.core.MessageService" version="1.0.0" timeout="12000" type="SN"> </reference>   中间件:provider.xml <service ref="localMsgSvc1" group="message" name="server" interface="com.boarsoft.message.core.MessageService" version="1.0.0" timeout="3000"> </service>   消费者:provide.xml <service ref="queue1Consumer1" group="message" name="queue1Consumer1" interface="com.boarsoft.message.core.MessageService" type="SN" version="1.0.0" timeout="12000"> </service>   消费者:Queue1Consumer1.java @Component("queue1Consumer1") public class Queue1Consumer1 implements MessageService { private static final Logger log = LoggerFactory.getLogger(Queue1Consumer1.class); @Override public void put(Message msg) throws Exception { log.info("Received message: {}", JsonUtil.from(msg)); } }

ActiveMQ/JMS收发

  消费者、生产者、中间件的 queues.xml 与前述基于RPC的方式相同。不再需要 consume.xml 和 provide.xml
  在中间件增加配置文件 activemq.xml,内容如下: <broker xmlns="http://activemq.apache.org/schema/core" useJmx="false"> <transportConnectors> <transportConnector uri="amqp://0.0.0.0:5672" /> <transportConnector uri="tcp://0.0.0.0:61616" /> </transportConnectors> <plugins> <discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" /> <bean xmlns="http://www.springframework.org/schema/beans" class="com.boarsoft.message.amq.plugin.InterceptorPlugin"> <property name="localMsgSvc1" ref="localMsgSvc1" /> </bean> </plugins> </broker>   基于ActiveMQ/JMS可以有多种方式实现消息的收发,这里分别给出短连接和长连接的两个示例。
  短连接方式消息发送: AmqPubTest1.java // 可以使用ActiveMQ API来创建连接工厂 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); // 也可以使用JMS API来创建连接工厂 // JmsConnectionFactory factory = new // JmsConnectionFactory("amqp://127.0.0.1:5672"); // JmsConnectionFactory factory = new // JmsConnectionFactory("tcp://127.0.0.1:61616"); Connection connection = factory.createConnection("admin", "admin"); // connection.setClientID("demo"); connection.start(); final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("queue1"); final MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); try { Message m = new Message(); m.setTarget("queue1"); m.setContent("Hello world!"); // 采用ActiveMQ/JMS方式时,是否需要设置收件人,取决于消息者如何处理 // m.addRecipient("queue1Consumer1"); m.setId(RandomUtil.genUUID()); log.info("Send message {}", m); TextMessage tm = session.createTextMessage(m.toJson()); producer.send(tm); // ObjectMessage om = session.createObjectMessage(m); // producer.send(om); } finally { connection.close(); }   长连接方式需要在生产者和消费都都增加 activemq.xml,内容如下: <!-- 实现 javax.jms.Message 与 com.boarsoft.message.bean.Message 之间的互转 --> <bean id="msgConverter" class="com.boarsoft.message.demo.DemoJmsMsgConverter"> </bean> <bean id="amqClient1" class="com.boarsoft.boar.gateway.activemq.AmqClientEx"> <property name="destMap"> <map> <entry key="queue1"> <bean class="com.boarsoft.boar.gateway.activemq.AmqDestination"> <property name="producer"> <bean class="com.boarsoft.boar.gateway.activemq.AmqProducerEx"> <property name="converter" ref="msgConverter" /> </bean> </property> <property name="consumer"> <bean class="com.boarsoft.boar.gateway.activemq.AmqConsumerEx"> <property name="converter" ref="msgConverter" /> <property name="handler" ref="queue1Consumer1" /> </bean> </property> </bean> </entry> </map> </property> <property name="connectionFactory"> <bean class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="maxConnections" value="10" /> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="failover:(tcp://localhost:61616,tcp://127.0.0.1:61616)?randomize=true" /> <property name="userName" value="admin" /> <property name="password" value="admin" /> </bean> </property> </bean> </property> </bean>   长连接方式消息发送: AmqPubTest2.java final MessageService client = (MessageService) ctx.getBean("amqClient1"); Message m = new Message(); m.setTarget("queue1"); m.setContent("Hello world!"); m.setId(RandomUtil.genUUID()); client.put(m);