`
gong1208
  • 浏览: 557047 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Java 消息服务(JMS)介绍­­­­——使用spring与activemq

    博客分类:
  • jms
阅读更多

 


                   Java 
消息服务(JMS)介绍

                                                                     ­­­­——使用springactivemq

 

第一次写技术文章,所以不懂规矩,写得也没什么条理,主要都是根据自己的一些实际操作经验写的,

以下代码都经过实际验证,其中也有自己不甚明了之处,欢迎大家拍砖或者交流。

 

关于jms,我打算会有一系列的文章,这里只是其中一节,

本节主要介绍如何在spring框架下配置使用jms,jms提供者使用activemq。
该配置相关说明:
1. 消息发送者与接受者分离,分别配置在两个独立的配置文件中
2. 启用一个消息服务器,其中产生一个queue一个topic,两个生产者分别往其中发送对象,发送时采用spring提供的转换器,可以实现java对象与jms消息的相互转化。
3. 消费者总共三个,一个消费queue中的消息,两个消费topic中的消息,我们称之为订阅者,其中一个订阅者是持久性订阅者,我们知道对于普通的订阅者来说,当该订阅者处于非活动期时topic中产生的消息是无法再传送给订阅者的(除非是实体化消息),但是持久化订阅者是可以收到其在非活动期间topic中产生的消息的(从其在服务器上注册时开始至现在)
4. 发送时使用的是同步发送方式,即发送者发送消息到服务器,等待服务器发送确认消息表示发送成功,发送者可以继续发送消息,消费者或者订阅者使用的是监听器方式,所以是采用异步接收方式,即接受者不需要一直阻塞直到接收消息,而是jms服务器有消息到达时会触发消息监听器的一个动作。当我们需要使用同步接收方式时就需要像同步发送消息一样使用jmstemplate。同时这里消息的确认方式采用默认的AUTO_ACKNOWLEDGE方式,即自动确认,即服务器收到消息立马发送确认消息,同样接收者收到消息会立即向服务器发送确认消息。需要注意的是,这里的同步异步,都只是描述客户端与服务器端之间的关系,而不是这发送者-服务器-接受者三者间的关系。


下面我们看具体的配置
activemq-produce.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd">

<!-- <context:property-placeholder location="classpath:server-config.properties" ignore-unresolvable="true"></context:property-placeholder>
-->
   <!--  指定发送端连接的activeMQ服务器-->
    <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>

    <!--  指定发送的目的地的类型和名字-->
    <bean id="MyQueue" class="org.apache.activemq.command.ActiveMQQueue">
       <constructor-arg value="firstQueue"></constructor-arg>
    </bean>

    <bean id="Topic-A" class="org.apache.activemq.command.ActiveMQTopic">
       <constructor-arg value="Topic-A"></constructor-arg>
    </bean>

    <!-- converter  -->
    <bean id="defaultMessageConverter" class="com.gy.myactivemq.DefaultMessageConverter"/>
<bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
    <!--  Spring JmsTemplate config,即消息发送模板 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory">
            <!--  lets wrap in a pool to avoid creating a connection per send -->
            <!--  单例模式,避免每次发送消息时新产生一个连接 -->
            <bean class="org.springframework.jms.connection.SingleConnectionFactory">
                <property name="targetConnectionFactory" ref="connectionFactory"/>
            </bean>
        </property>
         <!-- custom MessageConverter -->
        <property name="messageConverter" ref="simpleMessageConverter"/>
    </bean>

   <!-- POJO which send Message uses  Spring JmsTemplate ,使用消息模板发送消息-->
    <bean id="queueMessageProducer" class="com.gy.myactivemq.QueueMessageProducer">
        <property name="template" ref="jmsTemplate"/>
        <property name="destination" ref="MyQueue"/>
    </bean>

    <bean id="topicMessageProducer" class="com.gy.myactivemq.TopicMessageProducer">
        <property name="template" ref="jmsTemplate"/>
        <property name="destination" ref="Topic-A"/>
    </bean>

</beans>
其中com.gy.myactivemq.QueueMessageProducer的java代码如下:
public class QueueMessageProducer {
    private JmsTemplate template;
    private Queue destination;
    public void setTemplate(JmsTemplate template) {
        this.template = template;
    }
    public void setDestination(Queue destination) {
        this.destination = destination;
    }
    public void send(FooMessage message) {
        template.convertAndSend(this.destination, message);
    }
    public void sendByMe(String mess) {
        template.convertAndSend(this.destination, mess);
    }
}
其中包括两个可以用来发送消息的方法sendByMe和send,可以分别用来发送不同类型的消息,如果想要发送其他类型的消息可以自己定于,然后在单元测试中调用方式如下:
@Test
public void sendTest(){
ApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-produce.xml");
QueueMessageProducer qmp=(QueueMessageProducer) ctx.getBean("queueMessageProducer");
TopicMessageProducer tmp=(TopicMessageProducer) ctx.getBean("topicMessageProducer");
FooMessage message=new FooMessage();
message.setId(123);
qmp.send(message);
tmp.send(message);
}

activemq-consumer.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-3.0.xsd
     http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd
     http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

<!-- <context:property-placeholder location="classpath:server-config.properties" ignore-unresolvable="true"></context:property-placeholder>
-->
    <!-- 用于连接activeMQ服务器 -->
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
<property name="clientIDPrefix" value="www"/>
</bean>

<!-- ActiveMQ destinations ,连接的目标名称-->
    <bean id="MyQueue" class="org.apache.activemq.command.ActiveMQQueue">
       <constructor-arg value="firstQueue"></constructor-arg>
    </bean>

    <bean id="Topic-A" class="org.apache.activemq.command.ActiveMQTopic">
       <constructor-arg value="Topic-A"></constructor-arg>
    </bean>


    <bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>

<bean id="queueListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="queueConsumer" />
<property name="defaultListenerMethod" value="receive" />
<property name="messageConverter" ref="simpleMessageConverter" />
</bean>

<bean id="topicListenerA"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="topicConsumerA" />
<property name="defaultListenerMethod" value="receive" />
<property name="messageConverter" ref="simpleMessageConverter" />
</bean>

<bean id="topicListenerB"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="topicConsumerB" />
<property name="defaultListenerMethod" value="receive" />
<property name="messageConverter" ref="simpleMessageConverter" />
</bean>

<bean id="queueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="concurrentConsumers" value="1" />
<property name="destination" ref="MyQueue" />
<property name="messageListener" ref="queueListener" />
</bean>
<bean id="topicListenerContainerA"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="concurrentConsumers" value="1" />
<property name="destination" ref="Topic-A" />
<property name="messageListener" ref="topicListenerA" />
<property name="subscriptionDurable" value="true"/>
        <property name="clientId" value="clientId_001"/>
        <property name="durableSubscriptionName" value="clientId_001"/>
</bean>

<bean id="topicListenerContainerB"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="concurrentConsumers" value="1" />
<property name="destination" ref="Topic-A" />
<property name="messageListener" ref="topicListenerB" />
</bean>
<!-- consumer for queue -->
   <bean id="queueConsumer" class="com.gy.myactivemq.QueueConsumer"/>

<!-- consumer for topic -->
   <bean id="topicConsumerA" class="com.gy.myactivemq.TopicConsumerA" />
   <bean id="topicConsumerB" class="com.gy.myactivemq.TopicConsumerB" />

</beans>

需要注意的是这段配置:
<bean id="topicListenerContainerA"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="concurrentConsumers" value="1" />
<property name="destination" ref="Topic-A" />
<property name="messageListener" ref="topicListenerA" />
<property name="subscriptionDurable" value="true"/>
        <property name="clientId" value="clientId_001"/>
        <property name="durableSubscriptionName" value="clientId_001"/>
</bean>
其中subscriptionDurable属性表明该处于该container中的订阅者是一个持久订阅者,配置持久订阅者必须指定一个clientId的值,而且这个值对于每一个订阅者都必须是唯一的,因为jms服务器要根据每个订阅者的这个clientId为其注册,这样jms服务器才能确保在该订阅者不活动时为其保存消息。还注意到如下配置:
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
<property name="clientIDPrefix" value="www"/>
</bean>
其中<property name="clientIDPrefix" value="www"/>配置指明每个连接到jms服务器的连接的clientId的前缀是www,然后具体这个配置的用处现在还不清楚,之前网上说这个配置表明只有当连接的jms服务器的客户端的clientId名的前缀与这个配置一致时,该连接才能实现持久化订阅,但实际情况是这里不管要不要这个配置,订阅者只要指定任意的clientId值都可以实现持久订阅。

对于这段配置我们可以发现对于每一个listener监听者我们都需要配置一个container容器,这里我们可以通过jms标签简化这个配置,比如若有两个listener,topicListenerA和topicListenerB配置如下:
  <jms:listener-container connection-factory="connectionFactory" concurrency="1" destination-type="topic">
  <jms:listener destination="Topic-A" ref="topicListenerA" />
   <jms:listener destination="Topic-A" ref="topicListenerB" />
  </jms:listener-container>
其中topicListenerA和topicListenerB是在前面的配置文件中已经配好,当然我们也可以直接在<jms:listener destination="Topic-A" ref="topicListenerA" />标签中配置,这样下面这段代码也可以省了
<bean id="topicListenerA"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="topicConsumerA" />
<property name="defaultListenerMethod" value="receive" />
<property name="messageConverter" ref="simpleMessageConverter" />
</bean>
具体配置可以参阅《java消息服务》一书。
使用jms标签需要加入如下命名空间:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-3.0.xsd
     http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd
     http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
总之,这样就省去了一个listener需要一个container的麻烦,如果需要配置持久订阅者则需要如下配置:
   <!-- 弊端:一个listener-container若指定了client-id和destination-type来实现持久化订阅,则只能含有一个listener,因为每个listener的client-id必须唯一 -->
   <jms:listener-container connection-factory="connectionFactory" concurrency="1" client-id="clientId_001" destination-type="durableTopic">
   <jms:listener destination="Topic-A" ref="topicListenerA" />
   </jms:listener-container>
这么做的弊端已经在注释上标明,当然也许不是这样,但是目前个人没有发现如何将持久订阅者和非持久订阅者放在同一个container中。另外destination-type属性用于指定监听的消息提供者类型,默认是queue,所以监听topic时需要指定类型。

开启消息监听的代码很简单:
public static void main(String [] agr){
ApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-consumer.xml");
}
注意这里不使用单元测试的@Test是因为,使用单元测试的方法运行之后马上会停止该方法,导致连接之后马上会断开,消息可能都还没有被监听者收到,所以我们需要保持监听者一直与jms服务器连接。

这里,需要用的jar包有三个必须的,分别是spring-jms,activemq-all,slf4j相关的包(slf4j-simple,slf4j-spi),当然spring读取配置文件的相关包肯定也是需要的,用过spring的应该都知道。

另外,activemq的启动是单独启动的,也可以通过如下方式在spring配置中启动:
<amq:broker useJmx="false" persistent="true">
<amq:persistenceAdapter>
<amq:amqPersistenceAdapter directory="d:/amq"/>
</amq:persistenceAdapter>
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:61616" />
</amq:transportConnectors>
</amq:broker>
但是不推荐这种方式,建议分开来启动。

 

版权所有,转载请注明来源:

http://gong1208.iteye.com/blog/1558367

分享到:
评论
3 楼 u011710489 2017-02-20  
我的配置是这样的:
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin"  />


<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> 
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> 
  <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
  <!-- 同上,同理 -->
<!-- <constructor-arg ref="amqConnectionFactory" /> -->
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100" />
</bean>


<!-- 消息消费者 start-->

<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.queue" ref="queueReceiver1"/>
<jms:listener destination="test.queue" ref="queueReceiver2"/>
</jms:listener-container>

<!-- 定义Topic监听器 -->
<jms:listener-container destination-type="durableTopic" container-type="default" connection-factory="connectionFactory" acknowledge="auto" client-id="topic1">
<jms:listener destination="test.topic" ref="topicReceiver1"/>
</jms:listener-container>
<!--<jms:listener-container destination-type="durableTopic" container-type="default" connection-factory="connectionFactory2" acknowledge="auto" client-id="topic2">
<jms:listener destination="test.topic" ref="topicReceiver2"/>
</jms:listener-container>

--><!-- 消息消费者 end -->
2 楼 u011710489 2017-02-20  
ERROR 8976 --- [erContainer#2-1] o.s.j.l.DefaultMessageListenerContainer  : Could not refresh JMS Connection for destination 'test.topic' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: setClientID call not supported on proxy for shared Connection. Set the 'clientId' property on the SingleConnectionFactory instead.
1 楼 u011710489 2017-02-20  
你好,为什么我启动报错呢setClientID call not supported on proxy for shared Connection

相关推荐

Global site tag (gtag.js) - Google Analytics