Comment faire en sorte que ce message de file d'attente JMS est livré à l'externe WebService (CXF)?
La question
De la façon de configurer ActiveMQ
et <flow>
dans Mule ESB 3.2
, afin de s'assurer que le message tiré de la file d'attente se termine correctement manipulés par l'extérieur CXF service
?
Scénario
J'ai un CXF point de terminaison, ce qui devrait prendre des messages entrants et de les transférer à trois services externes dès que possible. Appelons-EX1, EX2, EX3. C'est assez facile, grâce à la <all>
composant introduit dans Mule 3.x.
L'exigence la plus importante de l'ensemble de la solution est de s'assurer que chaque message reçu finit par être remis à tous les trois CXF services. Nous avons donc fini avec l'idée, à mettre chaque message entrant dans Persistent JMS queues
(Q1, Q2, Q3). Après un message est lu à partir de la file d'attente de Qn, il est transféré directement à l'correspondant EXn point de terminaison, et donc - service externe.
Config
(Je peux fournir toute la config sur demande)
Nous avons configuré courtier ActiveMQ, comme décrit ici et filaire avec notre <flow>
config. Tout semble fonctionner comme prévu, j'ai JConsole connecté à ma demande donc je ne peux voir que les messages sont de type PERSISTANT et qu'ils finissent dans la bonne file d'attente. Si tout se passe bien: les messages sont reçus par les trois services EXn.
Tests
Le problème arrises lorsque nous éteignons l'un des services, disons EX2, et de redémarrer le serveur tout entier, la simulation de l'échec. Le message finit par être perdu (je suppose que ce n'est pas que la persistance de l', hein?).
Le plus curieux, c'est - Si nous avons envoyé 10 messages lors de l'EX2 est bas, après redémarrage du serveur, 9 d'entre eux sont correctement rupture de stock! Donc je pense que peut-être, juste peut-être, 9 de ces 10 messages ont été correctement mis en file d'attente, alors que l'un était constamment en rupture de stock lorsque le serveur a échoué en bas.
Cela me fait penser, que CXF point de terminaison n'est pas géré avec l'appui de transaction, dont je ne comprends pas, pour être honnête. Après tout ce que je peux voir le message dans la file d'attente lorsqu'il essaye d'être en rupture de stock, donc il doit être conservé. C'est clairement pas, mais pourquoi?
Mes propres tentatives
J'ai essayé un certain nombre de choses, dont aucun n'a fonctionné. Toujours un message est perdu.
- De ne pas utiliser toute
<jms:transaction />
des balises dans le flux n'a pas de travail - De départ jms transaction à recevoir message, joindre lors de l'envoi de
<cxf:jaxws-client />
- De l'utilisation de XA avec JBoss et
<xa-transaction />
- n'a pas de travail - Fournir
<default-exception-strategy>
config - Si je me souviens, il fait des choses pires
Toute aide est appréciée, merci.
CONFIG
ACTIVE MQ CONFIGURATION
<spring:bean id="AmqDefaultPolicyEntry" class="org.apache.activemq.broker.region.policy.PolicyEntry">
<spring:property name="queue" value="queue.*"/>
<spring:property name="deadLetterStrategy" ref="AmqDeadLetterStrategy"/>
</spring:bean>
<spring:bean id="AmqPolicyMap" class="org.apache.activemq.broker.region.policy.PolicyMap">
<spring:property name="defaultEntry" ref="AmqDefaultPolicyEntry"/>
</spring:bean>
<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
<spring:property name="brokerURL" value="vm://localhost?jms.prefetchPolicy.all=1&broker.persistent=true&broker.useJmx=true"/>
<spring:property name="redeliveryPolicy">
<spring:bean class="org.apache.activemq.RedeliveryPolicy">
<spring:property name="initialRedeliveryDelay" value="${props.initialRedeliveryDelay}"/>
<spring:property name="redeliveryDelay" value="${props.redeliveryDelay}"/>
<spring:property name="maximumRedeliveries" value="${props.maximumRedeliveries}"/>
<spring:property name="backOffMultiplier" value="${props.backOffMultiplier}"/>
</spring:bean>
</spring:property>
</spring:bean>
<spring:bean name="persistenceAdapter" class="org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter">
<spring:property name="directory" value="/home/bachman/activemq"/>
</spring:bean>
<spring:bean name="AmqBroker"
class="org.apache.activemq.broker.BrokerService"
init-method="start"
destroy-method="stop">
<spring:property name="brokerName" value="esb-amq-broker"/>
<spring:property name="persistent" value="true"/>
<spring:property name="dataDirectory" value="/home/bachman/activemq"/>
<spring:property name="useJmx" value="true"/>
<spring:property name="useShutdownHook" value="false"/>
<spring:property name="persistenceAdapter" ref="persistenceAdapter"/>
<spring:property name="destinationPolicy" ref="AmqPolicyMap"/>
</spring:bean>
<jms:activemq-connector name="PersistentJMSConnector" specification="1.1"
numberOfConsumers="1" maxRedelivery="-1" persistentDelivery="true"
connectionFactory-ref="connectionFactory" acknowledgementMode="CLIENT_ACKNOWLEDGE"
disableTemporaryReplyToDestinations="true"/>
FLUX - envoi de message entrant à 3 files d'attente Qn
<flow name="dispatch-to-queues">
<inbound-endpoint ref="incoming-cxf"/>
<!-- Each received message ends up to be sent to all destinations -->
<all>
<jms:outbound-endpoint name="queue.q1"
queue="queue.q1" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on Q1"
connector-ref="PersistentJMSConnector"/>
<jms:outbound-endpoint name="queue.q2"
queue="queue.q2" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on q2"
connector-ref="PersistentJMSConnector" />
<jms:outbound-endpoint name="queue.q3"
queue="queue.q3" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on Q3"
connector-ref="PersistentJMSConnector" />
</all>
<custom-processor class="com.mycompany.just.a.component.to.return.OK.via.Cxf" />
</flow>
De DÉBIT - poignée de livraison de Qn à EXn
<flow name="from-q1-to-ex1">
<jms:inbound-endpoint queue="queue.q1" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Pull from q1."
connector-ref="PersistentJMSConnector">
<jms:transaction action="ALWAYS_BEGIN" />
</jms:inbound-endpoint>
<logger message="Sending message to EX-1" level="INFO" />
<!-- Handle errors at this point in flow
<custom-processor class="pl.exception.lookup.Component">
<spring:property name="targetModuleName" value="Not-important"/>
</custom-processor>
-->
<outbound-endpoint ref="ex1-cxf-endpoint">
<jms:transaction action="ALWAYS_JOIN" timeout="360000"/>
</outbound-endpoint>
</flow>
Points de terminaison - déclaration des points de terminaison
<endpoint name="incoming-cxf" address="http://incoming.mycompany.com/in" exchange-pattern="request-response">
<cxf:jaxws-service serviceClass="com.mycompany.services.InService"/>
</endpoint>
<endpoint name="ex1-cxf-endpoint" address="http://com.mycompany.ex1" exchange-pattern="request-response">
<cxf:jaxws-client
clientClass="com.mycompany.services.Ex1"
wsdlLocation="classpath:wsdl/ex1.wsdl"
operation="someOperation"
port="SomePort"/>
</endpoint>
OriginalL'auteur | 2012-01-24
Vous devez vous connecter pour publier un commentaire.
Consommer des messages JMS dans une transaction est un must pour la solution de travail comme prévu: si une exception se produit dans le CXF sortant phase, JMS message va finir par être restaurée, puis rupture de stock, le déclenchement d'une nouvelle CXF appel.
Vous devez configurer le renvoi d'ActiveMQ client afin de réessayer assez de temps et peut-être pas trop vite (exponentielle par exemple). Vous aussi vous souhaitez gérer le DLQ de façon appropriée. ActiveMQ de la configuration du client avec le Printemps et les Haricots dans la Mule est indiqué: http://www.mulesoft.org/mule-activemq-integration-examples
Veillez également à consulter pour le bon courtier URL dans votre configuration d'usine. Avec votre courtier nom de esb-amq-courtier, votre configuration usine devrait être:
Ok, j'ai ajouté config. Si vous êtes curieux au sujet de pl.exception à la règle.de recherche.Composant, c'est juste un composant qui tente de prévoir si une exception est levée dans le prochain processeur de débit. J'ai aussi essayé de tester la solution sans elle, car j'ai pensé qu'il pourrait ombre sous-jacente des exceptions.
Ok, je vais régler ma config et de publier les résultats ici. Une question cependant: est - il ok que ma config manque de <défaut-exception-stratégie>, et explicite les restaurations? Je suppose que la transaction sera annulée automatiquement après CXF point de terminaison indique une exception, ou je me trompe?
OK, donc je pense que la Mule est en train de faire la bonne chose. Quand je veux une forte garantie de ne JMS message, je lance mon fournisseur JMS qui ne sont pas intégrées (c'est à dire autonome et en HA mode (cluster). Maintenant, pour votre cas particulier, le nom du courtier décalage est peut-être la question: vous êtes potentiellement à partir d'une seconde dans la mémoire courtier de l'usine de raccordement.
Je faisais allusion à ActiveMQ HA options, pas de Mule. Si JMS courtier HA, c'est bien si Mule ne l'est pas.
OriginalL'auteur David Dossot
Je ne sais pas si je vais beaucoup vous aider, mais c'est un couple de suggestions au sujet de votre problème:
Bonne chance
HTH
jérôme
OriginalL'auteur romje
Ne sais pas si cette considération est utile, mais ce sur la reconnaissance des modes? Serait-il possible que le message a été livré déjà (en auto reconnaissez mode) mais n'était pas encore correctement traitées par les consommateurs de terminaison de service?
Aucune idée de comment configurer la reconnaissance explicite dans ce scénario, mais peut-être la peine d'examiner de plus.
OriginalL'auteur Michael Ibbeken