Le printemps de l'Intégration à l'arrivée-Canal-Adaptateur pour lire les gros fichiers ligne par ligne
Je suis actuellement à l'aide de Spring Integration 4.1.0 avec le Printemps 4.1.2.
J'ai une exigence pour être capable de lire un fichier ligne par ligne et l'utilisation de chaque ligne lue comme un message. Fondamentalement, je veux permettre à "replay" pour l'un de nos sources de messages, mais les messages ne sont pas enregistrés dans des fichiers individuels, mais plutôt dans un seul fichier. Je n'ai pas de conditions de transaction pour ce cas d'utilisation.
Mes exigences sont semblables à la présente publication, sauf sur un fichier résidant sur le même serveur que celui que la JVM est en cours d'exécution sur: spring integration - lire un fichier distant ligne par ligne
Que je le vois j'ai les options suivantes:
1. Utilisation int-file:inbound-channel-adapter
pour lire le fichier, puis "split" ce fichier pour que le message 1 message devient maintenant plusieurs messages.
Exemple de fichier de configuration:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
<int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
<int:channel id="channel1"/>
<int:splitter input-channel="channel2" output-channel="nullChannel"/>
<int:channel id="channel2"/>
</beans>
Le problème est que le fichier est très volumineux et lors de l'utilisation de la technique ci-dessus l'ensemble du fichier est lu en mémoire et est ensuite divisé et la JVM est à court de mémoire. Vraiment les étapes sont les suivantes: lire une ligne et de convertir de la ligne de message, envoyer un message, supprimer un message de la mémoire, répétez.
-
Utilisation
int-file:tail-inbound-channel-adapter
avecend="false"
(qui représente la lecture à partir du début du fichier). Démarrer et arrêter cet adaptateur nécessaires pour chaque fichier (modifier le nom de fichier avant chaque départ).
Exemple de fichier de configuration:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <int-file:tail-inbound-channel-adapter id="apache" channel="exchangeSpringQueueChannel" task-executor="exchangeFileReplayTaskExecutor" file="C:\p2-test.txt" delay="1" end="false" reopen="true" file-delay="10000" /> <int:channel id="exchangeSpringQueueChannel" /> <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" /> </beans>
-
Ont de Printemps de l'Intégration d'appel dans le Ressort du Lot et de l'utilisation d'un
ItemReader
pour traiter le fichier. Certainement permet plus fine des contrôles sur l'ensemble du processus, mais une certaine quantité de travail pour le programme d'installation avec l'emploi du référentiel et de la ces (et je ne se soucient pas de l'histoire de job alors j'aimerais raconter l'emploi de ne pas enregistrer l'état et/ou de l'utilisation de la mémoireMapJobRepository
).
4. Créer mon propre FileLineByLineInboundChannelAdapter
en étendant MessageProducerSupport
.
Une grande partie du code peut être emprunté auprès de ApacheCommonsFileTailingMessageProducer
(voir aussi http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter). Ci-dessous est un échantillon, mais a besoin de quelques travaux pour mettre la lecture en son propre Thread
de sorte que j'ai l'honneur le stop()
commande alors que j'ai lu ligne par ligne.
package com.xxx.exchgateway.common.util.springintegration;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
* See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
*/
public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
private volatile File file;
/**
* The name of the file you wish to tail.
* @param file The absolute path of the file.
*/
public void setFile(File file) {
Assert.notNull("'file' cannot be null");
this.file = file;
}
protected File getFile() {
if (this.file == null) {
throw new IllegalStateException("No 'file' has been provided");
}
return this.file;
}
@Override
public String getComponentType() {
return "file:line-by-line-inbound-channel-adapter";
}
private void readFile() {
FileInputStream fstream;
try {
fstream = new FileInputStream(getFile());
BufferedReader br = new BufferedReader(new InputStreamReader(fstream));
String strLine;
//Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
while ((strLine = br.readLine()) != null && isRunning()) {
send(strLine);
}
//Close the input stream
IOUtils.closeQuietly(br);
IOUtils.closeQuietly(fstream);
} catch (FileNotFoundException e) {
//TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
//TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
protected void doStart() {
super.doStart();
//TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed
//and we want to honor the stop() command while we read line-by-line
readFile();
}
protected void send(String line) {
Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
super.sendMessage(message);
}
@Override
public Message<String> receive() {
//TODO Auto-generated method stub
return null;
}
}
Il ne semble pas pour moi que mon cas d'utilisation est en dehors du domaine des choses typiques gens à le faire, je suis surpris de voir que je ne peux pas trouver une solution out-of-the-box. J'ai cherché un peu, cependant, et regardé beaucoup d'exemples et n'ont malheureusement pas encore de trouver quelque chose que les suites à mes besoins.
Je suppose que peut-être que j'ai manqué quelque chose d'évident que le cadre propose déjà (mais peut-être cela tombe dans le floue la ligne entre le Printemps Integraton et Spring Batch). Quelqu'un peut-il me faire savoir si je suis totalement hors de la base avec mes idées, ou si il ya une solution simple que j'ai raté, ou de proposer d'autres suggestions?
OriginalL'auteur Tony Falabella | 2014-11-21
Vous devez vous connecter pour publier un commentaire.
Printemps De L'Intégration 4.x a une belle nouvelle fonctionnalité de l'utilisation d'un Itérateur de messages:
Spring Integration De Référence
Cela permet d'envoyer Itérateur messages n'ayant pas la lecture du fichier dans la mémoire.
Voici un exemple simple de Printemps Contexte fractionnement des fichiers CSV en un seul message par ligne:
Et c'est le répartiteur de mise en œuvre:
Veuillez noter que l'Itérateur de l'objet retourné par la splitMessage() méthode de gestionnaire d'.
Point intéressant! J'ai été le débogage des problèmes similaires (même fichier en cours de lecture en boucle) les deux dernières heures. Le lecteur doit être fermé, en effet, mais votre changement n'a pas résolu mon problème actuel. Je soupçonne que la fichier:entrant-canal-adaptateur maintient l'envoi d'un message avec le même fichier sur et plus (si ce n'est à régler à prévenir les doublons="true" sur l'entrée du canal à la carte). Aussi il serait bon de supprimer le fichier après qu'il a été traité.
J'ai pris la liberté de mettre à jour l'extrait de code avec les modifications qui m'ont aidé à terminer le traitement d'un fichier au Printemps XD environnement. @TonyFalabella il devrait être compatible avec vos besoins (juste un commentaire de fichier.delete() si pas nécessaire).
Ici, c'est même un meilleur Répartiteur de mise en œuvre: stackoverflow.com/questions/27171978/...
J'ai regardé Artem réponse et comme pour faire autant que possible dans SpEL, sans écrire de code donc je vais aller avec sa suggestion. Merci encore Smollet.
OriginalL'auteur Sergey Shcherbakov
Je suis aussi la , je suis aussi de copier les fichiers vers un autre dossier et la lecture des données à partir d'un fichier aussi
fileCopyApplicationContext.xml
FileHandler.java
SpringIntegrationFileCopyExample.java
OriginalL'auteur Usman Yaqoob