Comment un pool de threads être réutilisés après l'arrêt
J'ai un .fichier csv contenant plus de 70 millions de lignes dont chaque ligne est de générer un Praticable, puis exécuté par le pool de threads. Cet Exécutable va insérer un enregistrement dans Mysql.
Qui plus est , je veux enregistrer une position du fichier csv pour la RandomAccessFile à localiser. La position est écrit dans un Fichier.Je veux écrire cet enregistrement lorsque tous les threads dans le pool de threads sont finis.Donc ThreadPoolExecutor.shutdown() est invoquée. Mais quand plus de lignes de venir, j'ai besoin d'un pool de threads de nouveau. Comment puis-je réutiliser ce courant de pool de threads au lieu d'en faire une nouvelle.
Le code est comme suit:
public static boolean processPage() throws Exception {
long pos = getPosition();
long start = System.currentTimeMillis();
raf.seek(pos);
if(pos==0)
raf.readLine();
for (int i = 0; i < PAGESIZE; i++) {
String lineStr = raf.readLine();
if (lineStr == null)
return false;
String[] line = lineStr.split(",");
final ExperienceLogDO log = CsvExperienceLog.generateLog(line);
//System.out.println("userId: "+log.getUserId()%512);
pool.execute(new Runnable(){
public void run(){
try {
experienceService.insertExperienceLog(log);
} catch (BaseException e) {
e.printStackTrace();
}
}
});
long end = System.currentTimeMillis();
}
BufferedWriter resultWriter = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(new File(
RESULT_FILENAME), true)));
resultWriter.write("\n");
resultWriter.write(String.valueOf(raf.getFilePointer()));
resultWriter.close();
long time = System.currentTimeMillis()-start;
System.out.println(time);
return true;
}
Merci !
Vous devez vous connecter pour publier un commentaire.
Comme indiqué dans le la documentation, vous ne pouvez pas réutiliser un
ExecutorService
qui a été arrêté. Je le recommande à l'encontre de tout solutions de contournement, depuis que (a) ils peuvent ne pas fonctionner comme prévu dans toutes les situations; et (b) vous pouvez réaliser ce que vous souhaitez à l'aide de classes standard.Vous devez
instancier un nouvel
ExecutorService
; oupas la fin de la
ExecutorService
.La première solution est facilement mis en place, donc je ne vais pas le détailler.
Pour la deuxième, puisque vous voulez exécuter une action une fois que les tâches sont terminées, vous pouvez prendre un coup d'oeil à
ExecutorCompletionService
et l'utiliser à la place. Il encapsule uneExecutorService
qui se chargera de la gestion des threads, mais le runnables obtiendrez enveloppé dans quelque chose qui va dire leExecutorCompletionService
quand ils ont fini, donc il peut faire rapport à vous:La méthode
take()
sur leExecutorCompletionService
classe bloquera jusqu'à ce qu'une tâche est terminée (soit normalement ou brusquement). Il sera de retour uneFuture
, de sorte que vous pouvez vérifier les résultats si vous le souhaitez.J'espère que cela peut vous aider, car je n'ai pas tout à fait comprendre votre problème.
créer et regrouper toutes les tâches et de les soumettre à la piscine avec invokeAll (qui renvoie uniquement lorsque toutes les tâches sont effectuées avec succès)
ExecutorCompletionService
ouinvokeAll
?Après l'appel de l'arrêt sur un ExecutorService, aucune nouvelle Tâche sera accepté. Cela signifie que vous devez créer un nouveau ExecutorService pour chaque série de tâches.
Cependant, avec Java 8 ForkJoinPool.awaitQuiescence a été introduit. Si vous pouvez passer d'un ExecutorService à ForkJoinPool, vous pouvez utiliser cette méthode pour attendre jusqu'à ce que pas plus de tâches sont en cours d'exécution dans un ForkJoinPool sans avoir à appeler à l'arrêt. Cela signifie que vous pouvez remplir un ForkJoinPool avec des Tâches, en attente jusqu'à ce qu'il soit vide (au repos), puis de nouveau commencer à le remplir avec des Tâches, et ainsi de suite.