Essaie de tester la persistance de la file d'attente. Ne peut pas envoyer des messages lorsque le point d'arrêt de la consommation

Je suis en train de tester la file d'attente de la persistance de ActiveMQ.

J'ai intégré ActiveMQ serveur avec un consommateur unique.
Ce serveur embarqué recevoir des messages JMS de beaucoup d'autres JVM applications.

Il fonctionne très bien, le consommateur demande de recevoir les notifications.

Donc j'ai essayé de tester la persistance des messages.
J'ai mis (à distance) point d'arrêt sur la MessageListener du consommateur, de sorte que je peux mettre en file d'attente de messages et de faire le ActiveMQ crash du serveur.
Sur le redémarrage du serveur, j'aimerais que tous les mis en file d'attente des messages pour pouvoir être consommés, et de ne pas être perdu.

Et puis j'ai essayé de test.
Je suis arrivé à ce point d'arrêt sur le premier message envoyer.
Mais pour tous les messages que j'ai essayer de l'envoyer, j'ai le stacktrack sur le producteur:

Exception in thread "main" org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Wire format negotiation timeout: peer did not send his wire format.
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:168)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:469)
at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:534)
at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:612)
at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:604)
at com.xxxxxxxxxxx.mobilepush.client.RealClientTest.main(RealClientTest.java:29)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Caused by: javax.jms.JMSException: Wire format negotiation timeout: peer did not send his wire format.
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62)
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1380)
at org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1466)
at org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:308)
at org.springframework.jms.support.JmsAccessor.createSession(JmsAccessor.java:196)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:457)
... 9 more
Caused by: java.io.IOException: Wire format negotiation timeout: peer did not send his wire format.
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:98)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81)
at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86)
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1351)
... 13 more

Je ne comprends pas pourquoi mon producteur serait bloqué quand ma consommation est de mon point d'arrêt.

Mon courtier uri est: mobilepush.activemq.broker.transport.connector.uri=tcp://0.0.0.0:61616

Le producteur se connecte via le protocole tcp pour le courtier.
Le consommateur, en colocation avec le courtier, se connecte par le biais vm://localhost.


Ma configuration est assez simple:

SERVER:
<!--  lets create an embedded ActiveMQ Broker -->
<amq:broker useJmx="false" persistent="true">
<amq:transportConnectors>
<amq:transportConnector uri="${mobilepush.activemq.broker.transport.connector.uri}" />
</amq:transportConnectors>
<amq:persistenceAdapter>
<amq:kahaPersistenceAdapter directory="${mobilepush.activemq.broker.queue.persistence.directory}" maxDataFileLength="100 Mb"/>
</amq:persistenceAdapter>
</amq:broker>

CONSUMER:
(management namespace and xebia class it only a JMX decorator)
<bean id="connectionFactory" class="fr.xebia.management.jms.SpringManagedConnectionFactory">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory" >
<property name="brokerURL" value="${mobilepush.activemq.broker.uri}"/>
</bean>
</property>
</bean>
<bean id="pushConsumer" class="com.xxxxxxxxxxxxxxx.mobilepush.messaging.jms.PushConsumer">
<property name="jmsPushMessageConverter" ref="jmsPushMessageConverter"/>
<property name="pushDelegate" ref="directPushDelegate"/>
</bean>
<management:executor-service id="pushConsumerExecutor"
pool-size="${mobilepush.consumer.thread.min}-${mobilepush.consumer.thread.max}" keep-alive="60" />
<jms:listener-container
task-executor="pushConsumerExecutor"
connection-factory="connectionFactory"
acknowledge="auto"
container-class="fr.xebia.springframework.jms.ManagedDefaultMessageListenerContainer">
<jms:listener destination="mobilepush.queue" ref="pushConsumer" method="onMessage" />
</jms:listener-container>

PRODUCER:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"  >
<property name="brokerURL" value="${mobilepush.activemq.broker.uri}"/>
</bean>
<bean id="mobilePushJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="defaultDestination" ref="mobilePushQueue"/>
<property name="messageConverter" ref="jmsPushMessageConverter"/>
<property name="connectionFactory">
<!-- lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory">
<ref local="connectionFactory" />
</property>
</bean>
</property>
</bean>

OriginalL'auteur Sebastien Lorber | 2012-09-04