Signal d'une restauration à partir d'un JMS MessageListener
J'ai travaillé avec JMS et ActiveMQ. Tout fonctionne à merveilles. Je ne suis pas à l'aide de printemps, ni I.
L'interface javax.jms.MessageListener
a une seule méthode, onMessage
. Dans une mise en œuvre, il y a une chance, une exception sera levée. En effet, si une exception est lancée, alors je me dis que le message n'a pas été correctement traitée et doit être re-essayé. Donc, j'ai besoin ActiveMQ à attendre un peu de temps et puis, une nouvelle tentative. c'est à dire j'ai besoin de la levée d'une exception à la restauration de la JMS transaction.
Comment puis-je réaliser un tel comportement?
Peut-être il ya la configuration dans ActiveMQ, je n'ai pas réussi à en trouver.
Ou... peut-être pourrait faire disparaître avec de l'inscription MessageListener
s des consommateurs et de consommer les messages moi-même, dans une boucle comme:
while (true) {
//... some administrative stuff like ...
session = connection.createSesstion(true, SESSION_TRANSACTED)
try {
Message m = receiver.receive(queue, 1000L);
theMessageListener.onMessage(m);
session.commit();
} catch (Exception e) {
session.rollback();
Thread.sleep(someTimeDefinedSomewhereElse);
}
//... some more administrative stuff
}
dans un couple de fils, au lieu d'enregistrer l'écouteur.
Ou... j'arrivais à les décorer/AOP/octet-manipuler les MessageListener
s à le faire eux-mêmes.
La route qu'aimeriez-vous prendre et pourquoi?
note: je n'ai pas le plein contrôle sur la MessageListener
s code.
MODIFIER
Un test pour la preuve de concept:
@Test
@Ignore("Interactive test, just a proof of concept")
public void transaccionConListener() throws Exception {
final AtomicInteger atomicInteger = new AtomicInteger(0);
BrokerService brokerService = new BrokerService();
String bindAddress = "vm://localhost";
brokerService.addConnector(bindAddress);
brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService.setUseJmx(false);
brokerService.start();
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(500);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(2);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
activeMQConnectionFactory.setUseRetroactiveConsumer(true);
activeMQConnectionFactory.setClientIDPrefix("ID");
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
pooledConnectionFactory.start();
Connection connection = pooledConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue helloQueue = session.createQueue("Hello");
MessageConsumer consumer = session.createConsumer(helloQueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
switch (atomicInteger.getAndIncrement()) {
case 0:
System.out.println("OK, first message received " + textMessage.getText());
message.acknowledge();
break;
case 1:
System.out.println("NOPE, second must be retried " + textMessage.getText());
throw new RuntimeException("I failed, aaaaah");
case 2:
System.out.println("OK, second message received " + textMessage.getText());
message.acknowledge();
}
} catch (JMSException e) {
e.printStackTrace(System.out);
}
}
});
connection.start();
{
//A client sends two messages...
Connection connection1 = pooledConnectionFactory.createConnection();
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection1.start();
MessageProducer producer = session1.createProducer(helloQueue);
producer.send(session1.createTextMessage("Hello World 1"));
producer.send(session1.createTextMessage("Hello World 2"));
producer.close();
session1.close();
connection1.stop();
connection1.close();
}
JOptionPane.showInputDialog("I will wait, you watch the log...");
consumer.close();
session.close();
connection.stop();
connection.close();
pooledConnectionFactory.stop();
brokerService.stop();
assertEquals(3, atomicInteger.get());
}
- Merci beaucoup whaley et @Ammar pour les réponses. Je suis upvoting les deux puisque vous deux m'a orienté dans la bonne voie. Mais pas de prélèvement d'un droit de réponse. Parce que d'autres tests sont nécessaires.
Vous devez vous connecter pour publier un commentaire.
Si vous souhaitez utiliser SESSION_TRANSACTED que votre accusé de réception de la mode, alors vous avez besoin pour l'installation d'un RedeliveryPolicy sur votre Connexion/ConnectionFactory. Cette page sur ActiveMQ du site web contient également quelques bonnes infos pour ce que vous pourriez avoir à faire.
Puisque vous n'utilisez pas le Printemps, vous pouvez configurer un RedeliveryPolicy avec quelque chose de similaire pour le code suivant (à partir d'un des liens ci-dessus):
Modifier
En prenant votre extrait de code ajouté à la réponse, ce qui suit montre comment cela fonctionne avec les transactions. Essayez ce code avec la Session.rollback() la méthode commentée et vous verrez que l'utilisation de SESION_TRANSACTED et de la Session.commit/rollback fonctionne comme prévu:
}
Vous devez définir l'accusé de réception mode de Session.CLIENT_ACKNOWLEDGE, le client reconnaît avoir consommé message en appelant le message de reconnaître méthode.
QueueSession session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
Puis, après le traitement du message à besoin d'appeler le Message.reconnaissez() méthode pour supprimer ce message.
Si votre session est traitée,puis "acknowledgeMode" est ignoré de toute façon..Donc, il suffit de laisser votre session transactionnelles et session d'utilisation.la restauration et la session.s'engager à valider ou d'annuler votre transaction.