Comment faire en sorte que ce message de file d'attente JMS est livré à l'externe WebService (CXF)?

La question

De la façon de configurer ActiveMQ et <flow> dans Mule ESB 3.2, afin de s'assurer que le message tiré de la file d'attente se termine correctement manipulés par l'extérieur CXF service?

Scénario

J'ai un CXF point de terminaison, ce qui devrait prendre des messages entrants et de les transférer à trois services externes dès que possible. Appelons-EX1, EX2, EX3. C'est assez facile, grâce à la <all> composant introduit dans Mule 3.x.

L'exigence la plus importante de l'ensemble de la solution est de s'assurer que chaque message reçu finit par être remis à tous les trois CXF services. Nous avons donc fini avec l'idée, à mettre chaque message entrant dans Persistent JMS queues (Q1, Q2, Q3). Après un message est lu à partir de la file d'attente de Qn, il est transféré directement à l'correspondant EXn point de terminaison, et donc - service externe.

Config

(Je peux fournir toute la config sur demande)

Nous avons configuré courtier ActiveMQ, comme décrit ici et filaire avec notre <flow> config. Tout semble fonctionner comme prévu, j'ai JConsole connecté à ma demande donc je ne peux voir que les messages sont de type PERSISTANT et qu'ils finissent dans la bonne file d'attente. Si tout se passe bien: les messages sont reçus par les trois services EXn.

Tests

Le problème arrises lorsque nous éteignons l'un des services, disons EX2, et de redémarrer le serveur tout entier, la simulation de l'échec. Le message finit par être perdu (je suppose que ce n'est pas que la persistance de l', hein?).
Le plus curieux, c'est - Si nous avons envoyé 10 messages lors de l'EX2 est bas, après redémarrage du serveur, 9 d'entre eux sont correctement rupture de stock! Donc je pense que peut-être, juste peut-être, 9 de ces 10 messages ont été correctement mis en file d'attente, alors que l'un était constamment en rupture de stock lorsque le serveur a échoué en bas.

Cela me fait penser, que CXF point de terminaison n'est pas géré avec l'appui de transaction, dont je ne comprends pas, pour être honnête. Après tout ce que je peux voir le message dans la file d'attente lorsqu'il essaye d'être en rupture de stock, donc il doit être conservé. C'est clairement pas, mais pourquoi?

Mes propres tentatives
J'ai essayé un certain nombre de choses, dont aucun n'a fonctionné. Toujours un message est perdu.

  1. De ne pas utiliser toute <jms:transaction /> des balises dans le flux n'a pas de travail
  2. De départ jms transaction à recevoir message, joindre lors de l'envoi de <cxf:jaxws-client />
  3. De l'utilisation de XA avec JBoss et <xa-transaction /> - n'a pas de travail
  4. Fournir <default-exception-strategy> config - Si je me souviens, il fait des choses pires

Toute aide est appréciée, merci.

CONFIG

ACTIVE MQ CONFIGURATION

<spring:bean id="AmqDefaultPolicyEntry" class="org.apache.activemq.broker.region.policy.PolicyEntry">
<spring:property name="queue" value="queue.*"/>
<spring:property name="deadLetterStrategy" ref="AmqDeadLetterStrategy"/>
</spring:bean>
<spring:bean id="AmqPolicyMap" class="org.apache.activemq.broker.region.policy.PolicyMap">
<spring:property name="defaultEntry" ref="AmqDefaultPolicyEntry"/>
</spring:bean>
<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
<spring:property name="brokerURL" value="vm://localhost?jms.prefetchPolicy.all=1&amp;broker.persistent=true&amp;broker.useJmx=true"/>
<spring:property name="redeliveryPolicy">
<spring:bean class="org.apache.activemq.RedeliveryPolicy">
<spring:property name="initialRedeliveryDelay" value="${props.initialRedeliveryDelay}"/>
<spring:property name="redeliveryDelay" value="${props.redeliveryDelay}"/>
<spring:property name="maximumRedeliveries" value="${props.maximumRedeliveries}"/>
<spring:property name="backOffMultiplier" value="${props.backOffMultiplier}"/>
</spring:bean>
</spring:property>
</spring:bean>
<spring:bean name="persistenceAdapter" class="org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter">
<spring:property name="directory" value="/home/bachman/activemq"/>
</spring:bean>
<spring:bean name="AmqBroker"
class="org.apache.activemq.broker.BrokerService"
init-method="start"
destroy-method="stop">
<spring:property name="brokerName" value="esb-amq-broker"/>
<spring:property name="persistent" value="true"/>
<spring:property name="dataDirectory" value="/home/bachman/activemq"/>
<spring:property name="useJmx" value="true"/>
<spring:property name="useShutdownHook" value="false"/>
<spring:property name="persistenceAdapter" ref="persistenceAdapter"/>
<spring:property name="destinationPolicy" ref="AmqPolicyMap"/>
</spring:bean>
<jms:activemq-connector name="PersistentJMSConnector" specification="1.1"
numberOfConsumers="1" maxRedelivery="-1" persistentDelivery="true"
connectionFactory-ref="connectionFactory" acknowledgementMode="CLIENT_ACKNOWLEDGE"
disableTemporaryReplyToDestinations="true"/>

FLUX - envoi de message entrant à 3 files d'attente Qn

<flow name="dispatch-to-queues">
<inbound-endpoint ref="incoming-cxf"/>
<!-- Each received message ends up to be sent to all destinations -->
<all>
<jms:outbound-endpoint name="queue.q1"
queue="queue.q1" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on Q1"
connector-ref="PersistentJMSConnector"/>
<jms:outbound-endpoint name="queue.q2"
queue="queue.q2" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on q2"
connector-ref="PersistentJMSConnector" />
<jms:outbound-endpoint name="queue.q3"
queue="queue.q3" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on Q3"
connector-ref="PersistentJMSConnector" />
</all>
<custom-processor class="com.mycompany.just.a.component.to.return.OK.via.Cxf" />
</flow>

De DÉBIT - poignée de livraison de Qn à EXn

<flow name="from-q1-to-ex1">
<jms:inbound-endpoint queue="queue.q1" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Pull from q1."
connector-ref="PersistentJMSConnector">
<jms:transaction action="ALWAYS_BEGIN" />
</jms:inbound-endpoint>
<logger message="Sending message to EX-1" level="INFO" />
<!-- Handle errors at this point in flow
<custom-processor class="pl.exception.lookup.Component">
<spring:property name="targetModuleName" value="Not-important"/>
</custom-processor>
-->
<outbound-endpoint ref="ex1-cxf-endpoint">
<jms:transaction action="ALWAYS_JOIN" timeout="360000"/>
</outbound-endpoint>
</flow>

Points de terminaison - déclaration des points de terminaison

<endpoint name="incoming-cxf" address="http://incoming.mycompany.com/in" exchange-pattern="request-response">
<cxf:jaxws-service serviceClass="com.mycompany.services.InService"/>
</endpoint> 
<endpoint name="ex1-cxf-endpoint" address="http://com.mycompany.ex1" exchange-pattern="request-response">
<cxf:jaxws-client
clientClass="com.mycompany.services.Ex1"
wsdlLocation="classpath:wsdl/ex1.wsdl"
operation="someOperation"
port="SomePort"/>
</endpoint>
Toujours pas de solution, je commence un bounty. Je vais accepter toute réponse avec un exemple de message persistant avec CXF et active MQ remplir ce scénario en deux flux: IN_CXF -> FILE; FILE -> OUT_CXF

OriginalL'auteur | 2012-01-24