Signal d'une restauration à partir d'un JMS MessageListener

J'ai travaillé avec JMS et ActiveMQ. Tout fonctionne à merveilles. Je ne suis pas à l'aide de printemps, ni I.

L'interface javax.jms.MessageListener a une seule méthode, onMessage. Dans une mise en œuvre, il y a une chance, une exception sera levée. En effet, si une exception est lancée, alors je me dis que le message n'a pas été correctement traitée et doit être re-essayé. Donc, j'ai besoin ActiveMQ à attendre un peu de temps et puis, une nouvelle tentative. c'est à dire j'ai besoin de la levée d'une exception à la restauration de la JMS transaction.

Comment puis-je réaliser un tel comportement?

Peut-être il ya la configuration dans ActiveMQ, je n'ai pas réussi à en trouver.

Ou... peut-être pourrait faire disparaître avec de l'inscription MessageListeners des consommateurs et de consommer les messages moi-même, dans une boucle comme:

while (true) {
    //... some administrative stuff like ...
    session = connection.createSesstion(true, SESSION_TRANSACTED)
    try {
        Message m = receiver.receive(queue, 1000L);
        theMessageListener.onMessage(m);
        session.commit();
    } catch (Exception e) {
        session.rollback();
        Thread.sleep(someTimeDefinedSomewhereElse);
    }
    //... some more administrative stuff
}

dans un couple de fils, au lieu d'enregistrer l'écouteur.

Ou... j'arrivais à les décorer/AOP/octet-manipuler les MessageListeners à le faire eux-mêmes.

La route qu'aimeriez-vous prendre et pourquoi?

note: je n'ai pas le plein contrôle sur la MessageListeners code.

MODIFIER
Un test pour la preuve de concept:

@Test
@Ignore("Interactive test, just a proof of concept")
public void transaccionConListener() throws Exception {
final AtomicInteger atomicInteger = new AtomicInteger(0);
BrokerService brokerService = new BrokerService();
String bindAddress = "vm://localhost";
brokerService.addConnector(bindAddress);
brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService.setUseJmx(false);
brokerService.start();
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(500);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(2);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
activeMQConnectionFactory.setUseRetroactiveConsumer(true);
activeMQConnectionFactory.setClientIDPrefix("ID");
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
pooledConnectionFactory.start();
Connection connection = pooledConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue helloQueue = session.createQueue("Hello");
MessageConsumer consumer = session.createConsumer(helloQueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
switch (atomicInteger.getAndIncrement()) {
case 0:
System.out.println("OK, first message received " + textMessage.getText());
message.acknowledge();
break;
case 1:
System.out.println("NOPE, second must be retried " + textMessage.getText());
throw new RuntimeException("I failed, aaaaah");
case 2:
System.out.println("OK, second message received " + textMessage.getText());
message.acknowledge();
}
} catch (JMSException e) {
e.printStackTrace(System.out);
}
}
});
connection.start();
{
//A client sends two messages...
Connection connection1 = pooledConnectionFactory.createConnection();
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection1.start();
MessageProducer producer = session1.createProducer(helloQueue);
producer.send(session1.createTextMessage("Hello World 1"));
producer.send(session1.createTextMessage("Hello World 2"));
producer.close();
session1.close();
connection1.stop();
connection1.close();
}
JOptionPane.showInputDialog("I will wait, you watch the log...");
consumer.close();
session.close();
connection.stop();
connection.close();
pooledConnectionFactory.stop();
brokerService.stop();
assertEquals(3, atomicInteger.get());
}
  • Merci beaucoup whaley et @Ammar pour les réponses. Je suis upvoting les deux puisque vous deux m'a orienté dans la bonne voie. Mais pas de prélèvement d'un droit de réponse. Parce que d'autres tests sont nécessaires.