mq新api订阅实现
标签:nbsp ethos cat def rgs 订阅 try 绑定 null
mq连接
public class MessageUtils {
//获取mq的连接 public static Connection getConnection() throws IOException, TimeoutException { //定义一个连接工厂 ConnectionFactory factory = new ConnectionFactory();
//设置服务地址 factory.setHost("127.0.0.1");
//设置端口 factory.setPort(5672);
//设置路径 factory.setVirtualHost("/user_num1");
//设置用户名 factory.setUsername("user1");
//设置密码 factory.setPassword("user1");
return factory.newConnection(); } }
|
定义消息生产者
public class MessageProduction {
private static final String QUER_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException{ //获取一个连接 Connection connection = MessageUtils.getConnection(); //从连接中获取一个通道 Channel channel = connection.createChannel(); //声明交换机 //fanout 分发 channel.exchangeDeclare(QUER_NAME,"fanout");
//发送消息 String msg = "liujiang "; channel.basicPublish(QUER_NAME,"",null,msg.getBytes());
System.out.println("send"+msg); //关闭通道 channel.close(); connection.close(); } }
|
定义消息消费者
public class MessageConsumer {
private static final String QUER_NAME1 = "test_simple_queue2"; private static final String QUER_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //获取连接 Connection connection = MessageUtils.getConnection(); //创建频道 final Channel channel = connection.createChannel(); //队列声明 channel.queueDeclare(QUER_NAME1,false,false,false,null); //绑定队列到交换机转发器 channel.queueBind(QUER_NAME1,QUER_NAME,""); //保证每次之分发一个 channel.basicQos(1); //定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel){ //获取消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message = new String(body, "UTF-8"); System.out.println("消费者[1] msg:"+message.getBytes());
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[1] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; //自动应答 boolean autoAck = false; //监听队列 channel.basicConsume(QUER_NAME1, autoAck, consumer); }
}
|
mq新api订阅实现
标签:nbsp ethos cat def rgs 订阅 try 绑定 null
原文地址:https://www.cnblogs.com/wsxmiss/p/13219381.html
文章来自:
搜素材网的
编程语言模块,转载请注明文章出处。
文章标题:
mq新api订阅实现
文章链接:http://soscw.com/index.php/essay/39579.html
评论