Kafka Démarrage rapide: Quelles sont les Dépendances de ai-je besoin?
Je suis en train de travailler à travers le kafka de démarrage rapide:
http://kafka.apache.org/07/quickstart.html
et la base de Consommation de Groupe exemple:
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Groupe+Exemple
J'ai codé le Consommateur et ConsumerThreadPool comme ci-dessus:
import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerIterator;
public class Consumer implements Runnable {
private KafkaStream m_stream;
private Integer m_threadNumber;
public Consumer(KafkaStream a_stream, Integer a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()) {
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
}
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}
Un couple de d'autres facettes: je suis à l'aide de printemps pour gérer mon zookeeper:
import javax.inject.Named;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan("com.truecar.inventory.worker.core")
public class AppConfig {
@Bean
@Named("consumerConfig")
private static ConsumerConfig createConsumerConfig() {
String zookeeperAddress = "127.0.0.1:2181";
String groupId = "inventory";
Properties props = new Properties();
props.put("zookeeper.connect", zookeeperAddress);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
}
Et je compile avec Maven et le OneJar plugin Maven. Cependant, je compilez et exécutez l'résultant d'un bocal j'obtiens l'erreur suivante:
Aug 26, 2013 6:15:41 PM org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider registerDefaultFilters
INFO: JSR-330 'javax.inject.Named' annotation found and supported for component scanning
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: scala/ScalaObject
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:792)
at com.simontuffs.onejar.JarClassLoader.defineClass(JarClassLoader.java:803)
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:710)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2521)
at java.lang.Class.getDeclaredMethods(Class.java:1845)
at org.springframework.core.type.StandardAnnotationMetadata.getAnnotatedMethods(StandardAnnotationMetadata.java:180)
at org.springframework.context.annotation.ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClassParser.java:222)
at org.springframework.context.annotation.ConfigurationClassParser.processConfigurationClass(ConfigurationClassParser.java:165)
at org.springframework.context.annotation.ConfigurationClassParser.parse(ConfigurationClassParser.java:140)
at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:282)
at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:223)
at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:630)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:461)
at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73)
at com.truecar.inventory.worker.core.consumer.ConsumerThreadPool.<clinit>(ConsumerThreadPool.java:31)
at com.truecar.inventory.worker.core.application.Starter.main(Starter.java:20)
... 6 more
Caused by: java.lang.ClassNotFoundException: scala.ScalaObject
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:713)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 27 more
Maintenant, je sais peu de choses à propos de Kafka, et rien sur Scala. Comment puis-je résoudre ce problème? Que devrais-je essayer? Est-ce un problème connu? Ai-je besoin d'autres dépendances? Voici la kafka version dans mon pom.xml:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.0-beta1</version>
</dependency>
Mise à jour: j'ai contacté le Kafka dev mailing list, et ils me permettent de savoir spécifique à la version requise pour la scala de dépendances. Cependant, il est également un sans-papiers log4j de la dépendance, qui se traduit dans une autre exécution, pas le temps de compilation, d'exception.
Exception in thread "main" java.lang.reflect.InvocationTargetException
Caused by: java.lang.NoSuchMethodError: ch.qos.logback.classic.Logger.filterAndLog(Ljava/lang/String;Lorg/slf4j/Marker;Lch/qos/logback/classic/Level;Ljava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V
at org.apache.log4j.Category.log(Category.java:333)
at org.apache.commons.logging.impl.Log4JLogger.debug(Log4JLogger.java:177)
Une Autre Mise À Jour:
J'ai trouvé le bon log4j dépendance:
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
Mais je me suis rencontré avec encore plus énigmatique exception runtime:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: org/I0Itec/zkclient/IZkStateListener
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
À ce point, j'ai eu le WTF genre de sentiment. J'ai donc ajouté une autre dépendance:
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
Mais cet exposé encore une autre exception runtime:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge
at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:146)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:113)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
J'espère être en mesure d'obtenir ce bébé exemple et en cours d'exécution, mais c'est peut-être le prix à payer pour l'utilisation de bêta produits? Je devrais peut-être passer à Apache Active MQ. Mais ça a l'air moins fun. Ai-je raté quelque chose?
OriginalL'auteur David Williams | 2013-08-27
Vous devez vous connecter pour publier un commentaire.
Le problème est que kafka bêta a été construit de manière à ce que les pom généré avec un pot n'est pas valide et maven ne pouvait pas le reconnaître et analyser correctement, donc l'extraction de dépendances transitives. Nous avons réussi à atténuer ce problème en faisant appel à toutes les dépendances de cette pom (scala, zk, etc) dans notre pom définition. Nous sommes en attente de la prochaine versions bêta de kafka, dans laquelle le problème sera résolu.
Plein dépendances liste est ci-dessous. Notez que vous devez changer la scala version de dépendance en fonction de la postfix de votre kafka artefact.
Comme pour le
Bien, ne pas oublier que c'est la bêta de presse? Certaines mauvaises choses se passent, en effet, mais actuellement, nous sommes en cours d'exécution kafka 0.7 sans tous les efforts de.
on utilise kafka construit par twitter pour la 0.7. Qu'entendez-vous par programmation?
Ah, devraient être plus précis, en Java. Les seuls artefacts Maven pour Kafka sont de 0,8
om-nom-nom - que pensez-vous de ce problème? stackoverflow.com/questions/18477184/...
C'est peut-être préférable d'ajouter slf4j-jdk14 dans les dépendances.
OriginalL'auteur om-nom-nom
J'ai trouvé cette configuration de dépendances fonctionnelles:
OriginalL'auteur David Williams
Cela semble fonctionner :
(via où puis-je trouver repository maven pour kafka? )
OriginalL'auteur alex