虽然JMS一般都和异步处理相关,但它也可以同步的方式使用消息。可重载的receive(..)
方法提供了这种功能。在同步接收中,接收线程被阻塞直至获得一个消息,有可能出现线程被无限阻塞的危险情况。属性receiveTimeout指定了接收器可等待消息的延时时间。
类似于EJB世界里流行的消息驱动bean(MDB),消息驱动POJO(MDP)作为JMS消息的接收器。MDP的一个约束(但也请看下面的有关javax.jms.MessageListener
类的讨论)是它必须实现javax.jms.MessageListener
接口。另外当你的POJO将以多线程的方式接收消息时必须确保你的代码是线程-安全的。
以下是MDP的一个简单实现:
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class ExampleListener implements MessageListener { public void onMessage(Message message) { if (message instanceof TextMessage) { try { System.out.println(((TextMessage) message).getText()); } catch (JMSException ex) { throw new RuntimeException(ex); } } else { throw new IllegalArgumentException("Message must be of type TextMessage"); } } }
一旦你实现了MessageListener
后就可以创建一个消息侦听容器。
请看下面例子是如何定义和配置一个随Sping发行的消息侦听容器的(这个例子用DefaultMessageListenerContainer
)
<!-- this is the Message Driven POJO (MDP) --> <bean id="messageListener" class="jmsexample.ExampleListener" /> <!-- and this is the attendant message listener container --> <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="concurrentConsumers" value="5"/> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="destination" /> <property name="messageListener" ref="messageListener" /> </bean>
关于各个消息侦听容器实现的特色请参阅相关的Spring Javadoc文档。
SessionAwareMessageListener
接口是一个Spring专门用来提供类似于JMS MessageListener
的接口,也提供了从接收Message
来访问JMS Session
的消息处理方法。
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
如果你希望你的MDP可以响应所有接收到的消息(使用onMessage(Message, Session)
方法提供的Session
)那么你可以选择让你的MDP实现这个接口(优先于标准的JMS MessageListener
接口)。所有随Spring发行的支持MDP的消息侦听容器都支持MessageListener
或SessionAwareMessageListener
接口的实现。要注意的是实现了SessionAwareMessageListener
接口的类通过接口和Spring有了耦合。是否选择使用它完全取决于开发者或架构师。
请注意SessionAwareMessageListener
接口的'onMessage(..)'
方法会抛出JMSException
异常。和标准JMS MessageListener
接口相反,当使用SessionAwareMessageListener
接口时,客户端代码负责处理任何抛出的异常。
MessageListenerAdapter
类是Spring的异步支持消息类中的不变类(final class):简而言之,它允许你几乎将任意一个类做为MDP显露出来(当然有某些限制)。
如果你使用JMS 1.0.2 API,你将使用和MessageListenerAdapter
一样功能的类MessageListenerAdapter102
。
考虑如下接口定义。注意虽然这个接口既不是从MessageListener
也不是从SessionAwareMessageListener
继承来得,但通过MessageListenerAdapter
类依然可以当作一个MDP来使用。同时也请注意各种消息处理方法是如何根据他们可以接收并处理消息的内容来进行强类型匹配的。
public interface MessageDelegate { void handleMessage(String message); void handleMessage(Map message); void handleMessage(byte[] message); void handleMessage(Serializable message); }
public class DefaultMessageDelegate implements MessageDelegate {
// implementation elided for clarity...
}
特别请注意,上面的MessageDelegate
接口(上文中DefaultMessageDelegate
类)的实现完全不依赖于JMS。它是一个真正的POJO,我们可以通过如下配置把它设置成MDP。
<!-- this is the Message Driven POJO (MDP) --> <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg> <bean class="jmsexample.DefaultMessageDelegate"/> </constructor-arg> </bean> <!-- and this is the attendant message listener container... --> <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="concurrentConsumers" value="5"/> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="destination" /> <property name="messageListener" ref="messageListener" /> </bean>
下面是另外一个只能处理接收JMSTextMessage
消息的MDP示例。注意消息处理方法是如何实际调用'receive'
(在MessageListenerAdapter
中默认的消息处理方法的名字是'handleMessage'
)的,但是它是可配置的(你下面就将看到)。注意'receive(..)'
方法是如何使用强制类型来只接收和处理JMS TextMessage
消息的。
public interface TextMessageDelegate { void receive(TextMessage message); }
public class DefaultTextMessageDelegate implements TextMessageDelegate {
// implementation elided for clarity...
}
辅助的MessageListenerAdapter
类配置文件类似如下:
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
请注意,如果上面的'messageListener'
收到一个不是TextMessage
类型的JMS Message
,将会产生一个IllegalStateException
异常(随之产生的其他异常只被捕获而不处理)。
MessageListenerAdapter
还有一个功能就是如果处理方法返回一个非空值,它将自动返回一个响应消息
。
请看下面的接口及其实现:
public interface ResponsiveTextMessageDelegate {
// notice the return type...
String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
// implementation elided for clarity...
}
如果上面的DefaultResponsiveTextMessageDelegate
和MessageListenerAdapter
联合使用,那么任意从执行'receive(..)'
方法返回的非空值都将(缺省情况下)转换成一个TextMessage
。这个返回的TextMessage
将被发送到原来的Message
中JMS Reply-To属性定义的目的地
(如果存在),或者是MessageListenerAdapter
设置(如果配置了)的缺省目的地
;如果没有定义目的地
,那么将产生一个InvalidDestinationException
异常(此异常将不会只被捕获而不处理,它将沿着调用堆栈上传)。
参与到事务中只需要一点微小的改动。你需要创建一个事务管理器,并且注册到一个可以参与事务的子类中(DefaultMessageListenerContainer
或ServerSessionMessageListenerContainer
)。
为了创建事务管理器,你需要创建一个JmsTransactionManager
的实例并提供给它一个支持XA事务功能的连接工厂。
<bean id="transactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> <property name="connectionFactory" ref="connectionFactory" /> </bean>
然后你只需要把它加入到我们先前的容器配置中。容器会处理其他的事情。
<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="concurrentConsumers" value="5" />
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="destination" />
<property name="messageListener" ref="messageListener" />
<property name="transactionManager" ref="transactionManager" />
</bean>