ExecutorService que les interruptions de tâches après un délai d'attente
Je suis à la recherche d'un ExecutorService de mise en œuvre qui peut être fourni avec un délai d'attente. Les tâches qui sont soumis à l'ExecutorService sont interrompus si ils prennent plus de temps que le délai pour s'exécuter. L'application d'un tel engin n'est pas une tâche difficile, mais je me demandais si quelqu'un sait d'une mise en œuvre existantes.
Voici ce que j'ai trouvé basée sur quelques éléments de la discussion ci-dessous. Tous les commentaires?
import java.util.List;
import java.util.concurrent.*;
public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
private final long timeout;
private final TimeUnit timeoutUnit;
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
@Override
public void shutdown() {
timeoutExecutor.shutdown();
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
timeoutExecutor.shutdownNow();
return super.shutdownNow();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if(timeout > 0) {
final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
runningTasks.put(r, scheduled);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
timeoutTask.cancel(false);
}
}
class TimeoutTask implements Runnable {
private final Thread thread;
public TimeoutTask(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
thread.interrupt();
}
}
}
- Est que le début du délai d'attente au moment de la soumission? Ou le moment où le groupe commence à exécuter?
- Bonne question. Quand il commence à exécuter. Sans doute à l'aide de la
protected void beforeExecute(Thread t, Runnable r)
crochet. - êtes-vous toujours à l'aide de cette solution ou a été remplacé
- L'emploi où j'ai mis en place cette solution a été remplacé. 🙂
- J'ai besoin exactement ça, sauf un) j'ai besoin de mon principal planificateur de service à un pool de threads avec un seul thread de service depuis besoin de mes tâches à exécuter strictement simultanément et b) j'ai besoin d'être en mesure de préciser le délai d'expiration de la durée de chaque tâche au moment où le groupe est soumis. J'ai essayé d'utiliser cela comme un point de départ, mais qui s'étend ScheduledThreadPoolExecutor, mais je ne vois pas un moyen d'obtenir la durée d'expiration spécifiée qui est à spécifier lors de la tâche de la soumission par le biais de la beforeExecute méthode. Toutes les suggestions grandement appréciée!
Vous devez vous connecter pour publier un commentaire.
Vous pouvez utiliser un ScheduledExecutorService pour cela. D'abord vous devez soumettre qu'une seule fois pour commencer immédiatement et de conserver l'avenir qui est créé. Après cela, vous pouvez soumettre une nouvelle tâche qui permettrait d'annuler l'oubli de l'avenir après une certaine période de temps.
Cela permettra de exécuter votre gestionnaire (principale fonctionnalité à être interrompue) pendant 10 secondes, puis annuler (c'est à dire d'interruption) qui tâche spécifique.
ExecutorCompletionService
. Je suis à la recherche dans le maintenant.beforeExecute
crochet.}, 10000, TimeUnit.MILLISECONDS);
quand vous pourriez avoir écrit}, 10, TimeUnit.SECONDS);
Malheureusement, la solution est erronée. Il y a une sorte de bug avec
ScheduledThreadPoolExecutor
, a également signalé dans cette question: annulation d'une tâche de ne pas libérer les ressources de la mémoire associé à la tâche; les ressources sont publiées uniquement lorsque la tâche expire.Si vous avez donc créer un
TimeoutThreadPoolExecutor
avec un assez long temps d'expiration (une utilisation typique), et de soumettre des tâches assez rapidement, vous vous retrouvez de remplissage de la mémoire - même si les tâches effectivement accomplies avec succès.Vous pouvez voir le problème avec la suivante (très rudimentaire) programme de test:
Le programme épuise la mémoire disponible, bien qu'il attend pour le pondu
Runnable
s à compléter.J'ai bien sur ce sujet pendant un certain temps, mais malheureusement je ne pouvais pas arriver à une bonne solution.
EDIT:
J'ai découvert ce problème a été signalé comme JDK bug 6602600, et semble avoir été résolu très récemment.
Envelopper la tâche dans FutureTask et vous pouvez définir le délai d'attente pour la FutureTask. Regardez l'exemple dans ma réponse à cette question,
java native Processus de délai d'attente
java.util.concurrent
des classes, mais je suis à la recherche d'unExecutorService
mise en œuvre.Comment sur l'utilisation de l'
ExecutorService.shutDownNow()
méthode décrite en http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html? Il semble être la solution la plus simple.Il semble que le problème n'est pas dans le JDK bug 6602600 ( il a été résolu à 2010-05-22), mais dans
appel incorrect de sommeil(10) en forme de cercle. Outre noter que le Thread principal doit donner
directement la CHANCE à d'autres threads pour réaliser leurs tâches par invoquer le SOMMEIL(0)
CHAQUE branche du cercle extérieur.
C'est mieux, je pense, pour utiliser le Thread.rendement() au lieu de Fil.sleep(0)
Le résultat corrigé partie du problème précédent code est tel comme ceci:
Il fonctionne correctement avec le montant de l'extérieur du compteur jusqu'à 150 000 000 testé cercles.
Après la tonne de temps à l'enquête
Enfin, je l'utilise
invokeAll
méthode deExecutorService
pour résoudre ce problème.Qui sera strictement interrompre la tâche alors que la tâche en cours.
Ici est un exemple de
Le pro est vous pouvez également soumettre
ListenableFuture
à la mêmeExecutorService
.Juste un peu modifier la première ligne de code.
ListeningExecutorService
est l'Écoute de la fonctionnalité deExecutorService
à google de goyave projet (com.google.goyave) )invokeAll
. Qui fonctionne très bien. Juste un mot d'avertissement pour toute personne de penser à l'aide de ceci: bien queinvokeAll
renvoie une liste deFuture
objets, il semble en fait être une opération de blocage.L'aide de John W réponse, j'ai créé une mise en œuvre bien commencer le délai d'attente lors de la tâche commence son exécution. J'ai même écrire un test unitaire pour elle 🙂
Cependant, elle ne convient pas à mes besoins puisque certaines opérations d'e /s n'interrompez pas quand
Future.cancel()
est appelé (c'est à dire quandThread.interrupted()
est appelé).Quelques exemples de IO opération qui ne peut être interrompu lorsque
Thread.interrupted()
est appelée sontSocket.connect
etSocket.read
(et je soupçonne la plupart des OI opération mise en œuvre dansjava.io
). Toutes les opérations d'e /s dansjava.nio
doit être interrompu lorsqueThread.interrupted()
est appelé. Par exemple, c'est le cas pourSocketChannel.open
etSocketChannel.read
.De toute façon si quelqu'un est intéressé, j'ai créé un résumé pour un pool de threads exécuteur testamentaire qui permet de tâches en attente (si ils sont à l'aide de interruptible d'activités...): https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf
Socket.connect
etSocket.read
Ce à propos de cette idée alternative :
Petit échantillon ici :