Kafka Démarrage rapide: Quelles sont les Dépendances de ai-je besoin?

Je suis en train de travailler à travers le kafka de démarrage rapide:

http://kafka.apache.org/07/quickstart.html

et la base de Consommation de Groupe exemple:

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Groupe+Exemple

J'ai codé le Consommateur et ConsumerThreadPool comme ci-dessus:

import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerIterator;

public class Consumer implements Runnable {

    private KafkaStream m_stream;
    private Integer m_threadNumber;

    public Consumer(KafkaStream a_stream, Integer a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext()) {
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));

        }
        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}

Un couple de d'autres facettes: je suis à l'aide de printemps pour gérer mon zookeeper:

import javax.inject.Named;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan("com.truecar.inventory.worker.core")
public class AppConfig {

    @Bean
    @Named("consumerConfig")
    private static ConsumerConfig createConsumerConfig() {
        String zookeeperAddress = "127.0.0.1:2181";
        String groupId = "inventory";
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeperAddress);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }
}

Et je compile avec Maven et le OneJar plugin Maven. Cependant, je compilez et exécutez l'résultant d'un bocal j'obtiens l'erreur suivante:

Aug 26, 2013 6:15:41 PM org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider registerDefaultFilters
INFO: JSR-330 'javax.inject.Named' annotation found and supported for component scanning
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: scala/ScalaObject
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:792)
at com.simontuffs.onejar.JarClassLoader.defineClass(JarClassLoader.java:803)
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:710)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2521)
at java.lang.Class.getDeclaredMethods(Class.java:1845)
at org.springframework.core.type.StandardAnnotationMetadata.getAnnotatedMethods(StandardAnnotationMetadata.java:180)
at org.springframework.context.annotation.ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClassParser.java:222)
at org.springframework.context.annotation.ConfigurationClassParser.processConfigurationClass(ConfigurationClassParser.java:165)
at org.springframework.context.annotation.ConfigurationClassParser.parse(ConfigurationClassParser.java:140)
at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:282)
at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:223)
at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:630)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:461)
at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73)
at com.truecar.inventory.worker.core.consumer.ConsumerThreadPool.<clinit>(ConsumerThreadPool.java:31)
at com.truecar.inventory.worker.core.application.Starter.main(Starter.java:20)
... 6 more
Caused by: java.lang.ClassNotFoundException: scala.ScalaObject
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:713)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 27 more

Maintenant, je sais peu de choses à propos de Kafka, et rien sur Scala. Comment puis-je résoudre ce problème? Que devrais-je essayer? Est-ce un problème connu? Ai-je besoin d'autres dépendances? Voici la kafka version dans mon pom.xml:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.0-beta1</version>
</dependency> 

Mise à jour: j'ai contacté le Kafka dev mailing list, et ils me permettent de savoir spécifique à la version requise pour la scala de dépendances. Cependant, il est également un sans-papiers log4j de la dépendance, qui se traduit dans une autre exécution, pas le temps de compilation, d'exception.

Exception in thread "main" java.lang.reflect.InvocationTargetException
Caused by: java.lang.NoSuchMethodError: ch.qos.logback.classic.Logger.filterAndLog(Ljava/lang/String;Lorg/slf4j/Marker;Lch/qos/logback/classic/Level;Ljava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V
at org.apache.log4j.Category.log(Category.java:333)
at org.apache.commons.logging.impl.Log4JLogger.debug(Log4JLogger.java:177)

Une Autre Mise À Jour:

J'ai trouvé le bon log4j dépendance:

    <dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>

Mais je me suis rencontré avec encore plus énigmatique exception runtime:

Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: org/I0Itec/zkclient/IZkStateListener
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)

À ce point, j'ai eu le WTF genre de sentiment. J'ai donc ajouté une autre dépendance:

    <dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>

Mais cet exposé encore une autre exception runtime:

Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge
at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:146)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:113)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)

J'espère être en mesure d'obtenir ce bébé exemple et en cours d'exécution, mais c'est peut-être le prix à payer pour l'utilisation de bêta produits? Je devrais peut-être passer à Apache Active MQ. Mais ça a l'air moins fun. Ai-je raté quelque chose?

OriginalL'auteur David Williams | 2013-08-27