mac-message - 示例
点击下载
消息应用示例
|
简化版消息中间件
|
简化版消息管控
|
依赖JAR包与源码下载
基于RPC的收发
生产者:queues.xml
生产者:consume.xml
生产者: RpcPubTest.java
Message m = new Message();
m.setTarget("queue1");
m.setContent("Hello world!");
m.setId(RandomUtil.genUUID());
m.addRecipient("queue1Consumer1");
log.info("Send message {}", m);
// 此时,注入给Messenger的是localMsgSvc1则不是amqClient1
Messenger.send(m);
中间件:queues.xml
中间件:consumer.xml
中间件:provider.xml
消费者:provide.xml
消费者:Queue1Consumer1.java
@Component("queue1Consumer1")
public class Queue1Consumer1 implements MessageService {
private static final Logger log = LoggerFactory.getLogger(Queue1Consumer1.class);
@Override
public void put(Message msg) throws Exception {
log.info("Received message: {}", JsonUtil.from(msg));
}
}
ActiveMQ/JMS收发
消费者、生产者、中间件的 queues.xml 与前述基于RPC的方式相同。不再需要 consume.xml 和 provide.xml
在中间件增加配置文件 activemq.xml,内容如下:
基于ActiveMQ/JMS可以有多种方式实现消息的收发,这里分别给出短连接和长连接的两个示例。
短连接方式消息发送: AmqPubTest1.java
// 可以使用ActiveMQ API来创建连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
// 也可以使用JMS API来创建连接工厂
// JmsConnectionFactory factory = new
// JmsConnectionFactory("amqp://127.0.0.1:5672");
// JmsConnectionFactory factory = new
// JmsConnectionFactory("tcp://127.0.0.1:61616");
Connection connection = factory.createConnection("admin", "admin");
// connection.setClientID("demo");
connection.start();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("queue1");
final MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
try {
Message m = new Message();
m.setTarget("queue1");
m.setContent("Hello world!");
// 采用ActiveMQ/JMS方式时,是否需要设置收件人,取决于消息者如何处理
// m.addRecipient("queue1Consumer1");
m.setId(RandomUtil.genUUID());
log.info("Send message {}", m);
TextMessage tm = session.createTextMessage(m.toJson());
producer.send(tm);
// ObjectMessage om = session.createObjectMessage(m);
// producer.send(om);
} finally {
connection.close();
}
长连接方式需要在生产者和消费都都增加 activemq.xml,内容如下:
长连接方式消息发送: AmqPubTest2.java
final MessageService client = (MessageService) ctx.getBean("amqClient1");
Message m = new Message();
m.setTarget("queue1");
m.setContent("Hello world!");
m.setId(RandomUtil.genUUID());
client.put(m);