hornetq 入门(1)
2020-12-13 02:12
阅读:332
标签:des style blog class code java
Hornetq 版本2.4.0final 需要JDK7及以上
Hornetq官网
Hornetq2.1中文手册
step1.启动服务端
1.1准备配置文件(配置说明参考官网手册)
hornetq-configuration.xml

configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
name>HornetQ.main.configname>
bindings-directory>F:/hornetq/data/messaging/bindingsbindings-directory>
large-messages-directory>F:/hornetq/data/messaging/largemessageslarge-messages-directory>
paging-directory>F:/hornetq/data/messaging/pagingpaging-directory>
journal-directory>F:/hornetq/journaljournal-directory>
journal-min-files>10journal-min-files>
id-cache-size>9000id-cache-size>
jmx-management-enabled>truejmx-management-enabled>
message-counter-enabled>truemessage-counter-enabled>
message-counter-max-day-history>7message-counter-max-day-history>
message-counter-sample-period>60000message-counter-sample-period>
persistence-enabled>truepersistence-enabled>
cluster-user>HORNETQ.CLUSTER.ADMIN.USERcluster-user>
cluster-password>test65525cluster-password>
connectors>
connector name="connector-netty">
factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory
factory-class>
param key="use-nio" value="true" />
param key="host" value="localhost"/>
param key="port" value="11212" />
connector>
connector name="netty-ssl-connector">
factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactoryfactory-class>
param key="host" value="localhost"/>
param key="port" value="5500"/>
param key="ssl-enabled" value="true"/>
param key="key-store-path" value="F:/ssl/keystore"/>
param key="key-store-password" value="test"/>
connector>
connectors>
acceptors>
acceptor name="netty">
factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory
factory-class>
param key="use-nio" value="true" />
param key="host" value="0.0.0.0,127.0.0.1,localhost">param>
param key="port" value="11212" />
acceptor>
acceptor name="netty-ssl-acceptor">
factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactoryfactory-class>
param key="host" value="localhost"/>
param key="port" value="5500"/>
param key="ssl-enabled" value="true"/>
param key="key-store-path" value="F:/ssl/keystore"/>
param key="key-store-password" value="test"/>
param key="trust-store-path" value="F:/ssl/truststore"/>
param key="trust-store-password" value="test"/>
acceptor>
acceptors>
address-settings>
address-setting match="jms.queue.#">
redelivery-delay>5000redelivery-delay>
expiry-address>jms.queue.expiryQueueexpiry-address>
last-value-queue>truelast-value-queue>
max-size-bytes>100000max-size-bytes>
page-size-bytes>20000page-size-bytes>
redistribution-delay>0redistribution-delay>
address-full-policy>PAGEaddress-full-policy>
send-to-dla-on-no-route>truesend-to-dla-on-no-route>
dead-letter-address>jms.queue.deadLetterQueuedead-letter-address>
max-delivery-attempts>3max-delivery-attempts>
address-setting>
address-settings>
security-settings>
security-setting match="jms.queue.#">
permission type="createDurableQueue" roles="guest" />
permission type="deleteDurableQueue" roles="guest" />
permission type="createNonDurableQueue" roles="guest" />
permission type="deleteNonDurableQueue" roles="guest" />
permission type="consume" roles="guest" />
permission type="send" roles="guest" />
security-setting>
security-settings>
configuration>

hornetq-jms.xml

configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
connection-factory name="ConnectionFactory">
connectors>
connector-ref connector-name="connector-netty" />
connectors>
entries>
entry name="ConnectionFactory" />
entry name="/ConnectionFactory" />
entry name="XAConnectionFactory" />
entry name="/XAConnectionFactory" />
entry name="java:/ConnectionFactory"/>
entry name="java:/XAConnectionFactory"/>
entries>
retry-interval>1000retry-interval>
retry-interval-multiplier>1.5retry-interval-multiplier>
max-retry-interval>60000max-retry-interval>
reconnect-attempts>1000reconnect-attempts>
confirmation-window-size>1048576confirmation-window-size>
connection-factory>
queue name="notificationsQueue">
entry name="/queue/notificationsQueue">entry>
queue>
queue name="testQueue">
entry name="/queue/testQueue"/>
selector string="color=‘red‘"/>
durable>truedurable>
queue>
queue name="deadLetterQueue">
entry name="/queue/deadLetterQueue"/>
queue>
configuration>

1.2 启动hornetq服务

public static void startHornetqServer(){ try { //config hornetq-configuration.xml FileConfiguration config = new FileConfiguration(); config.start(); //HornetQServer HornetQServer server=HornetQServers.newHornetQServer(config); //JNPServer StandaloneNamingServer standalone=new StandaloneNamingServer(server); standalone.setBindAddress("0.0.0.0"); standalone.setRmiBindAddress("0.0.0.0"); standalone.start(); //JMSServer hornetq-jms.xml jmsServer=new JMSServerManagerImpl(server); jmsServer.start(); //start hornetq core server server.start(); System.out.println(jmsServer.isStarted()); } catch (Exception e) { e.printStackTrace(); } }

step2.发送消息客户端

/** * @param args */ public static void main(String[] args) { try { Properties prop = new Properties(); prop.setProperty("java.naming.factory.initial","org.jnp.interfaces.NamingContextFactory"); prop.setProperty("java.naming.provider.url", "jnp://127.0.0.1:1099"); prop.setProperty("java.naming.factory.url.pkgs","org.jboss.naming:org.jnp.interfaces"); prop.setProperty(Context.SECURITY_PRINCIPAL,"guest"); prop.setProperty(Context.SECURITY_CREDENTIALS, "guest"); Context ctx = new InitialContext(prop); System.out.println("+++++++1111ssssssss"); //查找目标地址 Destination destination = (Destination)ctx.lookup("/queue/notificationsQueue"); System.out.println("+++++++2222"+destination); //根据上下文查找一个连接工厂 QueueConnectionFactory 。 //该连接工厂是由JMS提供的,不需我们自己创建,每个厂商都为它绑定了一个全局JNDI,我们通过它的全局JNDI便可获取它; //ConnectionFactory 对应hornetq-jms.xml里面的 connection-factory name="ConnectionFactory" ConnectionFactory factory = (ConnectionFactory)ctx.lookup("ConnectionFactory"); System.out.println("+++++++3333"+factory); //从连接工厂得到一个连接 create QueueConnection Connection conn = factory.createConnection(); System.out.println("+++++++4444"+conn); conn.start(); //通过连接来建立一个会话(Session); javax.jms.Session session = conn.createSession(true,Session.AUTO_ACKNOWLEDGE); //根据会话以及目标地址来建立消息生产者MessageProducer (QueueSender和TopicPublisher都扩展自MessageProducer接口) MessageProducer producer = session.createProducer(destination); TextMessage msg = session.createTextMessage("ffffffffffffffffffffffffffffffffffffffffffffff小心呈现出"); BytesMessage byteMessage=session.createBytesMessage(); byteMessage.writeBytes("testddddddddd".getBytes("utf-8")); producer.send(msg); producer.send(byteMessage); System.out.println("send over !!!!!"); session.close(); conn.close(); System.out.println("send down==="); } catch (Exception e) { e.printStackTrace(); } }

step3.接受消息客户端

public MessageReceive(String ...destinationJNDI){ QueueConnectionFactory factory=(QueueConnectionFactory)getJNDIRemoteObj("ConnectionFactory"); try { if(factory==null) return; connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); for (int i = 0; i ) { destination = (Queue) getJNDIRemoteObj(destinationJNDI[i]); if(destination==null) continue; producer = session.createConsumer(destination); //接受消息 producer.setMessageListener(new ReceiveMessage()); } } catch (JMSException e) { e.printStackTrace(); } }


public static Object getJNDIRemoteObj(String jndiName) { try { Properties prop = new Properties(); prop.setProperty("java.naming.factory.initial","org.jnp.interfaces.NamingContextFactory"); prop.setProperty("java.naming.provider.url", "jnp://127.0.0.1:1099"); prop.setProperty("java.naming.factory.url.pkgs","org.jboss.naming:org.jnp.interfaces"); prop.setProperty(Context.SECURITY_PRINCIPAL,"guest"); prop.setProperty(Context.SECURITY_CREDENTIALS, "guest"); Context context = new InitialContext(prop); return context.lookup(jndiName); } catch (NamingException e) { e.printStackTrace(); } return null; }


public class ReceiveMessage implements MessageListener { @SuppressWarnings("deprecation") @Override public void onMessage(Message message) { System.out.println("Received notification:"+new Date().toLocaleString()); try { // Enumeration propertyNames = message.getPropertyNames(); // while (propertyNames.hasMoreElements()) // { // String propertyName = (String)propertyNames.nextElement(); // System.err.format(" %s: %s\n", propertyName, message.getObjectProperty(propertyName)); // } HornetQDestination des=(HornetQDestination) message.getJMSDestination(); if(message instanceof TextMessage){ TextMessage mesg=(TextMessage)message; System.out.println(des.getAddress()+"==received:"+mesg.getText()); }else if(message instanceof BytesMessage){ BytesMessage mesg=(BytesMessage)message; ByteArrayOutputStream out=new ByteArrayOutputStream(((Long)mesg.getBodyLength()).intValue()); try { byte[] r=new byte[2048]; int i=0; while((i=mesg.readBytes(r))!=-1) out.write(r,0,i); System.out.println(des.getClass()+"==received:"+new String(out.toByteArray(),"utf-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } try { out.close(); } catch (IOException e) { e.printStackTrace(); } } if(message instanceof HornetQObjectMessage){ HornetQObjectMessage object=(HornetQObjectMessage)message; Object msgObj=object.getObject(); if(msgObj instanceof ErrorMessageBO){ ErrorMessageBO messageBO=(ErrorMessageBO)msgObj; String msg=messageBO.getMessageContent(); System.err.println("error:==>"+msg); } } } catch (JMSException e) { e.printStackTrace(); } System.out.println("----------end--------------"); }

hornetq 入门(1),搜素材,soscw.com
hornetq 入门(1)
标签:des style blog class code java
原文地址:http://www.cnblogs.com/qwj888/p/3716023.html
评论
亲,登录后才可以留言!