Le printemps de l'Intégration à l'arrivée-Canal-Adaptateur pour lire les gros fichiers ligne par ligne

Je suis actuellement à l'aide de Spring Integration 4.1.0 avec le Printemps 4.1.2.
J'ai une exigence pour être capable de lire un fichier ligne par ligne et l'utilisation de chaque ligne lue comme un message. Fondamentalement, je veux permettre à "replay" pour l'un de nos sources de messages, mais les messages ne sont pas enregistrés dans des fichiers individuels, mais plutôt dans un seul fichier. Je n'ai pas de conditions de transaction pour ce cas d'utilisation.
Mes exigences sont semblables à la présente publication, sauf sur un fichier résidant sur le même serveur que celui que la JVM est en cours d'exécution sur: spring integration - lire un fichier distant ligne par ligne

Que je le vois j'ai les options suivantes:

1. Utilisation int-file:inbound-channel-adapter pour lire le fichier, puis "split" ce fichier pour que le message 1 message devient maintenant plusieurs messages.
Exemple de fichier de configuration:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

        <int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
        <int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
        <int:channel id="channel1"/>
        <int:splitter input-channel="channel2" output-channel="nullChannel"/>
        <int:channel id="channel2"/>
    </beans>

Le problème est que le fichier est très volumineux et lors de l'utilisation de la technique ci-dessus l'ensemble du fichier est lu en mémoire et est ensuite divisé et la JVM est à court de mémoire. Vraiment les étapes sont les suivantes: lire une ligne et de convertir de la ligne de message, envoyer un message, supprimer un message de la mémoire, répétez.

  1. Utilisation int-file:tail-inbound-channel-adapter avec end="false" (qui représente la lecture à partir du début du fichier). Démarrer et arrêter cet adaptateur nécessaires pour chaque fichier (modifier le nom de fichier avant chaque départ).
    Exemple de fichier de configuration:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    
        <int-file:tail-inbound-channel-adapter id="apache"
            channel="exchangeSpringQueueChannel"
            task-executor="exchangeFileReplayTaskExecutor"
            file="C:\p2-test.txt"
            delay="1"
            end="false"
            reopen="true"
            file-delay="10000" />
    
        <int:channel id="exchangeSpringQueueChannel" />
        <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" />
    </beans>
  2. Ont de Printemps de l'Intégration d'appel dans le Ressort du Lot et de l'utilisation d'un ItemReader pour traiter le fichier. Certainement permet plus fine des contrôles sur l'ensemble du processus, mais une certaine quantité de travail pour le programme d'installation avec l'emploi du référentiel et de la ces (et je ne se soucient pas de l'histoire de job alors j'aimerais raconter l'emploi de ne pas enregistrer l'état et/ou de l'utilisation de la mémoire MapJobRepository).

4. Créer mon propre FileLineByLineInboundChannelAdapter en étendant MessageProducerSupport.
Une grande partie du code peut être emprunté auprès de ApacheCommonsFileTailingMessageProducer (voir aussi http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter). Ci-dessous est un échantillon, mais a besoin de quelques travaux pour mettre la lecture en son propre Thread de sorte que j'ai l'honneur le stop() commande alors que j'ai lu ligne par ligne.

    package com.xxx.exchgateway.common.util.springintegration;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
* See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
*/
public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
private volatile File file;
/**
* The name of the file you wish to tail.
* @param file The absolute path of the file.
*/
public void setFile(File file) {
Assert.notNull("'file' cannot be null");
this.file = file;
}
protected File getFile() {
if (this.file == null) {
throw new IllegalStateException("No 'file' has been provided");
}
return this.file;
}
@Override
public String getComponentType() {
return "file:line-by-line-inbound-channel-adapter";
}
private void readFile() {
FileInputStream fstream;
try {
fstream = new FileInputStream(getFile());
BufferedReader br = new BufferedReader(new InputStreamReader(fstream));
String strLine;
//Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
while ((strLine = br.readLine()) != null && isRunning()) {
send(strLine);
}
//Close the input stream
IOUtils.closeQuietly(br);
IOUtils.closeQuietly(fstream);
} catch (FileNotFoundException e) {
//TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
//TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
protected void doStart() {
super.doStart();
//TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed
//and we want to honor the stop() command while we read line-by-line
readFile();
}
protected void send(String line) {
Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
super.sendMessage(message);
}
@Override
public Message<String> receive() {
//TODO Auto-generated method stub
return null;
}
}

Il ne semble pas pour moi que mon cas d'utilisation est en dehors du domaine des choses typiques gens à le faire, je suis surpris de voir que je ne peux pas trouver une solution out-of-the-box. J'ai cherché un peu, cependant, et regardé beaucoup d'exemples et n'ont malheureusement pas encore de trouver quelque chose que les suites à mes besoins.

Je suppose que peut-être que j'ai manqué quelque chose d'évident que le cadre propose déjà (mais peut-être cela tombe dans le floue la ligne entre le Printemps Integraton et Spring Batch). Quelqu'un peut-il me faire savoir si je suis totalement hors de la base avec mes idées, ou si il ya une solution simple que j'ai raté, ou de proposer d'autres suggestions?

OriginalL'auteur Tony Falabella | 2014-11-21