Multi-threading avec Spring batch Rubrique d'un Fichier de Lecteur
Dans un Spring Batch je suis en train de lire un fichier CSV et que vous voulez attribuer à chaque ligne à un thread séparé et processus. J'ai essayé de le faire à l'aide de TaskExecutor, mais ce qui se passe tout le thread est de choisir la même ligne à la fois. J'ai aussi essayé de mettre en œuvre le concept à l'aide de Partioner, il y a aussi la même chose qui se passe. Veuillez voir ci-dessous ma Configuration Xml.
Étape De La Description
<step id="Step2">
<tasklet task-executor="taskExecutor">
<chunk reader="reader" processor="processor" writer="writer" commit-interval="1" skip-limit="1">
</chunk>
</tasklet>
</step>
<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="file:cvs/user.csv" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<!-- split it -->
<property name="lineTokenizer">
<bean
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names" value="userid,customerId,ssoId,flag1,flag2" />
</bean>
</property>
<property name="fieldSetMapper">
<!-- map to an object -->
<bean
class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="prototypeBeanName" value="user" />
</bean>
</property>
</bean>
</property>
</bean>
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
<property name="concurrencyLimit" value="4"/>
J'ai essayé avec différents types d'exécuteur de tâches, mais ils sont tous de se comporter de la même manière. Comment puis-je attribuer à chaque ligne à un thread séparé?
Vous pouvez consulter cette stackoverflow.com/questions/20243629/...
OriginalL'auteur slowhandblues | 2014-01-22
Vous devez vous connecter pour publier un commentaire.
FlatFileItemReader n'est pas thread-safe. Dans votre exemple, vous pouvez essayer de diviser le fichier CSV pour les petits fichiers CSV et ensuite utiliser un MultiResourcePartitioner de traiter chacun d'entre eux. Cela peut être fait en 2 étapes, une pour le fractionnement du fichier d'origine(de l'ordre de 10 fichiers plus petits) et l'autre pour le traitement des fichiers splittés.De cette façon, vous n'aurez pas de problèmes étant donné que chaque dossier sera traité par un seul thread.
Exemple:
L'alternative au lieu de partitionnement peut être Personnalisé Thread-safe Lecteur qui permettra de créer un thread pour chaque ligne, mais probablement de partitionnement est votre meilleur choix
Certainement le partitionnement et c'est parce qu'une lecture personnalisée traitera toujours ligne par ligne.D'autre part, de nombreux petits fichiers csv sera procesed simultanément(partitionné-étape).Gardez à l'esprit qu'il ya beaucoup d'autres facteurs, à l'exception de mise à l'échelle techinques pour l'optimisation des performances comme jouer avec la validation de l'intervalle, de sauter et de relance de la politique, en général, chaque cas a ses propres blocages. Espérons que ça aide!
Grand, il est sûr à utiliser MultiThreadedFlatFileItemReader à distance de partitionnement ? github.com/sshcherbakov/spring-batch-talk/blob/master/src/main/...
Je crois, et comme indiqué ici, vous devez être très prudent lorsque les ressources des lecteurs sont(local pour les esclaves). Avec précaution, j'imagine que ça va fonctionner et si non, une question peut résoudre le problème 🙂 (je n'ai pas essayer le Lecteur que vous avez mentionné, mais à partir d'un rapide coup d'oeil est vraiment à portée de main)
Si nous utilisons MultiResourcePartitioner, comment doit-être le lecteur de configuration ? peut-on utiliser FlatFileItemReader avec ressource #{stepExecutionContext[nom de fichier]} ? ou nous avons besoin d'utiliser MultiResouceItemReader ?
OriginalL'auteur dimzak
Vous êtes le problème, c'est que le lecteur n'est pas dans le champ d'application de l'étape .
Qui signifie : toutes les threads partagent le même Flux d'entrée (fichier de Ressources).
D'avoir pour chaque thread d'une ligne de processus, vous devez :
à la fin du fichier (Chaque thread doit ouvrir le flux et de le fermer pour
chaque contexte d'exécution )
contexte d'exécution.
Je écrire un peu de code et c'est la sortie :
Code de
com.test.partitioner.RangePartitioner
classe :--> Chercher à la sortie de la console
De Départ : Thread1
fromId : 1
toId : 1
De Départ : Thread2
fromId : 2
toId : 2
De Départ : Thread3
fromId : 3
toId : 3
De Départ : Thread4
fromId : 4
toId : 4
De Départ : Thread5
fromId : 5
toId : 5
De Départ : Thread6
fromId : 6
toId : 6
De Départ : Thread7
fromId : 7
toId : 7
De Départ : Thread8
fromId : 8
toId : 8
De Départ : Thread9
fromId : 9
toId : 9
De Départ : Thread10
fromId : 10
toId : 10
Regarder la configuration ci-dessous :
http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">
TODO : Changer mon lecteur sur d'autres qui lisent à la position (de début et de fin de poste) comme avec Scanner de Classe en java.
Espérons que cette aide.
gridSize
variable initialisée?OriginalL'auteur yahyayouness
Vous pouvez diviser votre fichier d'entrée pour de nombreux fichiers , l'utilisation de Partitionner et de charger des fichiers de petite taille avec des threads, mais en cas d'erreur , vous devez redémarrer tous les emplois après DB nettoyé.
Plein exemple de code de travail (sur Github)
Espérons que cette aide.
OriginalL'auteur M. Mohamed