ActiveMQ et intégré courtier

EDIT: Reformuler la question:

Je veux utiliser ActiveMQ comme un service de messager entre mon serveur et des applications du client.

Je suis en train de mettre en place un intégré à un courtier (c'est à dire pas d'un processus séparé) dans le serveur pour gérer le produit des messages pour mes clients à consommer. Cette file d'attente est conservée.

Le courtier d'initialisation comme suit:

BrokerService broker = new BrokerService();
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter();
adaptor.setDirectory(new File("activemq"));
broker.setPersistenceAdapter(adaptor);
broker.setUseJmx(true);
broker.addConnector("tcp://localhost:61616");
broker.start();

Après bricoler, j'ai fini avec la partie serveur:

public static class HelloWorldProducer implements Runnable {
    public void run() {
        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost"); //apparently the vm part is all i need
            Connection connection = connectionFactory.createConnection(); 
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("TEST.FOO");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
            TextMessage message = session.createTextMessage(text);
            System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
            producer.send(message);
            session.close();
            connection.close();
        }
        catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }
}

Le client est très similaire et ressemble à ceci:

public static class HelloWorldConsumer implements Runnable, ExceptionListener {
    public void run() {
        try {
          ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost");
            Connection connection = connectionFactory.createConnection(); //exception happens here...
            connection.start();
            connection.setExceptionListener(this);
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("TEST.FOO");
            MessageConsumer consumer = session.createConsumer(destination);
            Message message = consumer.receive(1000);
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String text = textMessage.getText();
                System.out.println("*****Received: " + text);
            } else {
                System.out.println("*****Received obj: " + message);
            }
            consumer.close();
            session.close();
            connection.close();
        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }

La principale méthode commence simplement chacun de ces dans un thread pour démarrer la production/réception de messages.

...mais je suis en cours d'exécution dans la suite avec le début de chaque thread:

2013-01-24 07:54:31,271 INFO  [org.apache.activemq.broker.BrokerService] Using Persistence Adapter: AMQPersistenceAdapter(activemq-data/localhost)
2013-01-24 07:54:31,281 INFO  [org.apache.activemq.store.amq.AMQPersistenceAdapter] AMQStore starting using directory: activemq-data/localhost
2013-01-24 07:54:31,302 INFO  [org.apache.activemq.kaha.impl.KahaStore] Kaha Store using data directory activemq-data/localhost/kr-store/state
2013-01-24 07:54:31,339 INFO  [org.apache.activemq.store.amq.AMQPersistenceAdapter] Active data files: []
2013-01-24 07:54:31,445 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Probably not using JRE 1.4: mx4j.tools.naming.NamingService
2013-01-24 07:54:31,450 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Failed to create local registry
java.rmi.server.ExportException: internal error: ObjID already in use
at sun.rmi.transport.ObjectTable.putTarget(ObjectTable.java:186)
at sun.rmi.transport.Transport.exportObject(Transport.java:92)
at sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:247)
at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:411)
at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)
<snip....>

Il semble que les messages sont produits et consommés avec succès (les autres questions que j'ai déjà posté au sujet a été résolu), mais l'exception ci-dessus qui m'inquiète.

EDIT: au Cours de courtier d'arrêt, je suis maintenant aussi accueillis par le texte suivant:

2013-01-25 08:40:17,486 DEBUG [org.apache.activemq.transport.failover.FailoverTransport] Transport failed with the following exception:
java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
at java.lang.Thread.run(Thread.java:722)
Vous devez inclure tous les broker code de création afin que nous puissions voir ce que tout ce que vous avez configurés. Vous pouvez désactiver JMX si vous n'avez pas besoin par l'intermédiaire d'un courtier.setUseJmx(false);
Pour clarifier, le message affiché est de niveau de DÉBOGAGE. Ce n'est pas nécessairement une ERREUR. Il pourrait juste être instructif. Vous rencontrez une erreur dans le fait de produire/consommer des messages? La question n'est pas claire sur ce point.
J'ai reformulé la question complètement. Essentiellement, je demande que les 3 sous-questions. (1) L'exception d'une, (2) des messages Perdus et (3) la Persévérance. Merci pour regarder ma question.
On dirait que vous êtes à l'aide d'une très ancienne version de ActiveMQ que son défaut AMQ persistance adaptateur, vous conseille de passer à la version 5.8.0.
Voulez-vous exécuter un embedded courtier (c'est à dire ActiveMQ lui-même) à l'intérieur de votre serveur ou juste un JMS producteur/consommateur? Dans le premier cas, merci de poster aussi des détails sur la façon dont vous essayé de lancer ce courtier.

OriginalL'auteur Jaco Van Niekerk | 2013-01-23