ActiveMQ et intégré courtier
EDIT: Reformuler la question:
Je veux utiliser ActiveMQ comme un service de messager entre mon serveur et des applications du client.
Je suis en train de mettre en place un intégré à un courtier (c'est à dire pas d'un processus séparé) dans le serveur pour gérer le produit des messages pour mes clients à consommer. Cette file d'attente est conservée.
Le courtier d'initialisation comme suit:
BrokerService broker = new BrokerService();
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter();
adaptor.setDirectory(new File("activemq"));
broker.setPersistenceAdapter(adaptor);
broker.setUseJmx(true);
broker.addConnector("tcp://localhost:61616");
broker.start();
Après bricoler, j'ai fini avec la partie serveur:
public static class HelloWorldProducer implements Runnable {
public void run() {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost"); //apparently the vm part is all i need
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST.FOO");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
TextMessage message = session.createTextMessage(text);
System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
producer.send(message);
session.close();
connection.close();
}
catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
}
Le client est très similaire et ressemble à ceci:
public static class HelloWorldConsumer implements Runnable, ExceptionListener {
public void run() {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost");
Connection connection = connectionFactory.createConnection(); //exception happens here...
connection.start();
connection.setExceptionListener(this);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST.FOO");
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("*****Received: " + text);
} else {
System.out.println("*****Received obj: " + message);
}
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
La principale méthode commence simplement chacun de ces dans un thread pour démarrer la production/réception de messages.
...mais je suis en cours d'exécution dans la suite avec le début de chaque thread:
2013-01-24 07:54:31,271 INFO [org.apache.activemq.broker.BrokerService] Using Persistence Adapter: AMQPersistenceAdapter(activemq-data/localhost)
2013-01-24 07:54:31,281 INFO [org.apache.activemq.store.amq.AMQPersistenceAdapter] AMQStore starting using directory: activemq-data/localhost
2013-01-24 07:54:31,302 INFO [org.apache.activemq.kaha.impl.KahaStore] Kaha Store using data directory activemq-data/localhost/kr-store/state
2013-01-24 07:54:31,339 INFO [org.apache.activemq.store.amq.AMQPersistenceAdapter] Active data files: []
2013-01-24 07:54:31,445 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Probably not using JRE 1.4: mx4j.tools.naming.NamingService
2013-01-24 07:54:31,450 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Failed to create local registry
java.rmi.server.ExportException: internal error: ObjID already in use
at sun.rmi.transport.ObjectTable.putTarget(ObjectTable.java:186)
at sun.rmi.transport.Transport.exportObject(Transport.java:92)
at sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:247)
at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:411)
at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)
<snip....>
Il semble que les messages sont produits et consommés avec succès (les autres questions que j'ai déjà posté au sujet a été résolu), mais l'exception ci-dessus qui m'inquiète.
EDIT: au Cours de courtier d'arrêt, je suis maintenant aussi accueillis par le texte suivant:
2013-01-25 08:40:17,486 DEBUG [org.apache.activemq.transport.failover.FailoverTransport] Transport failed with the following exception:
java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
at java.lang.Thread.run(Thread.java:722)
Pour clarifier, le message affiché est de niveau de DÉBOGAGE. Ce n'est pas nécessairement une ERREUR. Il pourrait juste être instructif. Vous rencontrez une erreur dans le fait de produire/consommer des messages? La question n'est pas claire sur ce point.
J'ai reformulé la question complètement. Essentiellement, je demande que les 3 sous-questions. (1) L'exception d'une, (2) des messages Perdus et (3) la Persévérance. Merci pour regarder ma question.
On dirait que vous êtes à l'aide d'une très ancienne version de ActiveMQ que son défaut AMQ persistance adaptateur, vous conseille de passer à la version 5.8.0.
Voulez-vous exécuter un embedded courtier (c'est à dire ActiveMQ lui-même) à l'intérieur de votre serveur ou juste un JMS producteur/consommateur? Dans le premier cas, merci de poster aussi des détails sur la façon dont vous essayé de lancer ce courtier.
OriginalL'auteur Jaco Van Niekerk | 2013-01-23
Vous devez vous connecter pour publier un commentaire.
Vous pouvez incorporer un courtier dans votre code dans un certain nombre de façons, beaucoup de qui est documenté ici. Vous pouvez essayer la mise à niveau depuis la version que vous utilisez semble être assez vieux car il défaut maintenant obsolète AMQ Magasin au lieu de la plus récente KahaDB magasin. Vous pourriez avoir des problèmes à cause d'une course entre le client et le fils comme ils utilisent les différentes fabriques de connexions qui pourraient course pour créer la VM courtiers. Si vous définissez la créer=false option sur le producteur et assurer le thread consommateur commence après que pourrait répondre à la question, ou vous pouvez créer la VM courtier à l'avance et de les ajouter à créer=false à la fois du thread et qui pourrait faire l'affaire.
Et ensuite dans le code du client il suffit de le joindre via cette connexion configuration d'usine.
L'exception semble être de vous dire que le client de connexion a échoué car vous arrêter le courtier alors que le client est toujours connecté. Pas un énorme problème si vous êtes à la fermeture de l'application. Il ressemble à votre client à l'aide de TCP au lieu de VM dans son usine de raccordement, bien que son pas entièrement claire de la question.
Oui... c'est bien la cause du problème et l'exception peut être ignoré en toute sécurité. Pour éviter cela, les clients doit être éteint avant que le serveur (qui comprennent le courtier).
OriginalL'auteur Tim Bish
Lorsque j'exécute le code, j'ai eu le dessous exception:
Votre courtier est en cours d'exécution et à l'écoute de port 61616, de sorte que tout client qui tente de se connecter à un courtier besoin d'avoir le port dans son URL.
Le code de client tente de se connecter à localhost, mais ne mentionne pas le port auquel il doit se connecter.
À la fois le producteur et le consommateur, le code doit être fixe.
À
Après fixation du port, j'ai été en mesure d'exécuter votre code.
OriginalL'auteur Satish