Ce qui détermine le nombre de threads Java ForkJoinPool crée?
Autant que j'avais compris ForkJoinPool
, que la piscine crée un nombre fixe de threads (par défaut: nombre de cœurs) et ne pourra jamais créer plusieurs threads (à moins que l'application indique une nécessité pour celles-ci en utilisant managedBlock
).
Cependant, l'utilisation de ForkJoinPool.getPoolSize()
j'ai découvert que dans un programme qui crée de 30 000 tâches (RecursiveAction
), le ForkJoinPool
de l'exécution de ces tâches utilise 700 threads en moyenne (fils comptés chaque fois qu'une tâche est créée). Les tâches ne font pas I/O, mais pur calcul; la seule inter-tâche de synchronisation est l'appel de ForkJoinTask.join()
et l'accès AtomicBoolean
s, i.e. il n'y a pas de fil-opérations de blocage.
Depuis join()
ne pas bloquer le thread appelant comme je le comprends, il n'y a aucune raison pour qu'un thread dans la piscine doit jamais bloquer, et donc (j'avais supposé) il devrait y avoir aucune raison de créer toute autre fils (qui est de toute évidence passe quand même).
Alors, pourquoi ne ForkJoinPool
créer autant de fils? Quels facteurs déterminent le nombre de threads créés?
J'avais espéré que cette question pourrait être répondu sans poster du code, mais ici, il s'agit à la demande. Ce code est un extrait d'un programme de quatre fois la taille, réduite à l'essentiel des parties; il ne compile pas car il est. Si vous le souhaitez, je peux bien sûr post le programme complet, trop.
Le programme recherche un labyrinthe pour un chemin d'accès à partir d'un point de départ à un point de fin en utilisant la profondeur de la première recherche. Une solution de l'existence est garantie. La logique principale est dans le compute()
méthode de SolverTask
: Un RecursiveAction
qui commence à un certain point donné et se poursuit avec tous les points voisins accessible à partir du point courant. Plutôt que de créer un nouveau SolverTask
à chaque point d'embranchement (qui permettrait de créer de trop nombreuses tâches), il pousse tous ses voisins sauf sur un retour en arrière de la pile à être traitées plus tard et se poursuit avec seulement l'un des voisins ne les pousse pas à la pile. Une fois qu'il atteint une impasse de cette façon, le point le plus récemment poussé à la mandature de la pile est sortie, et la recherche se poursuit à partir de là (en coupant le chemin construit à partir de la tâche du point de départ en conséquence). Une nouvelle tâche est créée une fois qu'une tâche trouve sa mandature de la pile supérieure à un certain seuil; à partir de ce moment, la tâche, tout en continuant à la pop de sa mandature de la pile jusqu'à ce que, épuisé, ne va pas pousser les autres points à sa pile lors de l'atteinte d'un point d'embranchement, mais de créer une nouvelle tâche pour chaque point. Ainsi, la taille des tâches peut être ajustée à l'aide de la pile de seuil limite.
Les chiffres que j'ai cités ci-dessus ("tâches de 30 000, 700 threads en moyenne") qui sont à la recherche d'un labyrinthe de 5000x5000 cellules. Alors, voici l'essentiel de code:
class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
//Once the backtrack stack has reached this size, the current task
//will never add another cell to it, but create a new task for each
//newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000;
/**
* @return Tries to compute a path through the maze from local start to end
* and returns that (or null if no such path found)
*/
@Override
public ArrayDeque<Point> compute() {
//Is this task still accepting new branches for processing on its own,
//or will it create new tasks to handle those?
boolean stillAcceptingNewBranches = true;
Point current = localStart;
ArrayDeque<Point> pathFromLocalStart = new ArrayDeque<Point>(); //Path from localStart to (including) current
ArrayDeque<PointAndDirection> backtrackStack = new ArrayDeque<PointAndDirection>();
//Used as a stack: Branches not yet taken; solver will backtrack to these branching points later
Direction[] allDirections = Direction.values();
while (!current.equals(end)) {
pathFromLocalStart.addLast(current);
//Collect current's unvisited neighbors in random order:
ArrayDeque<PointAndDirection> neighborsToVisit = new ArrayDeque<PointAndDirection>(allDirections.length);
for (Direction directionToNeighbor: allDirections) {
Point neighbor = current.getNeighbor(directionToNeighbor);
//contains() and hasPassage() are read-only methods and thus need no synchronization
if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor))
neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite));
}
//Process unvisited neighbors
if (neighborsToVisit.size() == 1) {
//Current node is no branch: Continue with that neighbor
current = neighborsToVisit.getFirst().getPoint();
continue;
}
if (neighborsToVisit.size() >= 2) {
//Current node is a branch
if (stillAcceptingNewBranches) {
current = neighborsToVisit.removeLast().getPoint();
//Push all neighbors except one on the backtrack stack for later processing
for(PointAndDirection neighborAndDirection: neighborsToVisit)
backtrackStack.push(neighborAndDirection);
if (backtrackStack.size() > MAX_BACKTRACK_CELLS)
stillAcceptingNewBranches = false;
//Continue with the one neighbor that was not pushed onto the backtrack stack
continue;
} else {
//Current node is a branch point, but this task does not accept new branches any more:
//Create new task for each neighbor to visit and wait for the end of those tasks
SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()];
int t = 0;
for(PointAndDirection neighborAndDirection: neighborsToVisit) {
SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze);
task.fork();
subTasks[t++] = task;
}
for (SolverTask task: subTasks) {
ArrayDeque<Point> subTaskResult = null;
try {
subTaskResult = task.join();
} catch (CancellationException e) {
//Nothing to do here: Another task has found the solution and cancelled all other tasks
}
catch (Exception e) {
e.printStackTrace();
}
if (subTaskResult != null) { //subtask found solution
pathFromLocalStart.addAll(subTaskResult);
//No need to wait for the other subtasks once a solution has been found
return pathFromLocalStart;
}
} //for subTasks
} //else (not accepting any more branches)
} //if (current node is a branch)
//Current node is dead end or all its neighbors lead to dead ends:
//Continue with a node from the backtracking stack, if any is left:
if (backtrackStack.isEmpty()) {
return null; //No more backtracking avaible: No solution exists => end of this task
}
//Backtrack: Continue with cell saved at latest branching point:
PointAndDirection pd = backtrackStack.pop();
current = pd.getPoint();
Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint());
//DEBUG System.out.println("Backtracking to " + branchingPoint);
//Remove the dead end from the top of pathSoFar, i.e. all cells after branchingPoint:
while (!pathFromLocalStart.peekLast().equals(branchingPoint)) {
//DEBUG System.out.println(" Going back before " + pathSoFar.peekLast());
pathFromLocalStart.removeLast();
}
//continue while loop with newly popped current
} //while (current ...
if (!current.equals(end)) {
//this task was interrupted by another one that already found the solution
//and should end now therefore:
return null;
} else {
//Found the solution path:
pathFromLocalStart.addLast(current);
return pathFromLocalStart;
}
} //compute()
} //class SolverTask
@SuppressWarnings("serial")
public class ParallelMaze {
//for each cell in the maze: Has the solver visited it yet?
private final AtomicBoolean[][] visited;
/**
* Atomically marks this point as visited unless visited before
* @return whether the point was visited for the first time, i.e. whether it could be marked
*/
boolean visit(Point p) {
return visited[p.getX()][p.getY()].compareAndSet(false, true);
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1));
//Start initial task
long startTime = System.currentTimeMillis();
//since SolverTask.compute() expects its starting point already visited,
//must do that explicitly for the global starting point:
maze.visit(maze.start);
maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze));
//One solution is enough: Stop all tasks that are still running
pool.shutdownNow();
pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
long endTime = System.currentTimeMillis();
System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " +
width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s.");
}
Vous devez vous connecter pour publier un commentaire.
Il ya des questions sur stackoverflow:
ForkJoinPool stands lors de invokeAll/join
ForkJoinPool semble déchets un thread
J'ai fait un exécutable version dépouillée de ce qui se passe (jvm arguments que j'ai utilisé: -Xms256m -Xmx1024m -Xss8m):
Apparemment lorsque vous effectuez une jointure, thread actuel voit que la tâche requise n'est pas encore achevée, et prend une autre tâche pour lui-même de le faire.
Il arrive dans
java.util.concurrent.ForkJoinWorkerThread#joinTask
.Cependant cette nouvelle tâche génère plus des mêmes tâches, mais ils ne peuvent pas trouver de sujets dans la piscine, parce que les fils sont noués dans la jointure. Et depuis, il n'a aucun moyen de savoir combien de temps il faudra pour eux d'être libéré (thread pourrait être dans une boucle infinie ou dans l'impasse pour toujours), new thread(s) est(sont) a donné naissance (pour Compenser rejoint les threads comme Louis Wasserman mentionné):
java.util.concurrent.ForkJoinPool#signalWork
Donc, pour éviter un tel scénario, vous devez éviter récursive de frai de tâches.
Par exemple, si dans le code ci-dessus vous définissez initiales des paramètres à 1, thread actif, le montant sera de 2, même si vous augmentez childrenCount décuplé.
Également de noter que, tandis que le montant de threads actifs augmente, la quantité de threads en cours d'exécution est inférieur ou égal à parallélisme.
À partir de la source commentaires:
Je pense que ce qui se passe, c'est que vous n'êtes pas la finition de l'une des tâches très rapidement, et comme il n'y a pas de threads disponibles lorsque vous soumettez une nouvelle tâche, un nouveau thread est créé.
stricte, plein stricte, et terminale stricte avoir à faire avec le traitement d'un graphe dirigé acyclique (DAG). Vous pouvez google ces termes pour obtenir une pleine compréhension de ces derniers. C'est le type de traitement le cadre a été conçu pour traiter. Regarde le code de l'API pour les Récursive..., le cadre s'appuie sur votre compute() code pour faire d'autres compute() liens et puis faire un join(). Chaque Tâche un join (), tout comme le traitement d'un DAG.
Vous ne faites pas les DAG de traitement. Vous êtes à la bifurcation de nombreuses nouvelles Tâches et d'attente (join()) sur chaque. À lire dans le code source. C'est très complexe, mais vous pouvez être en mesure de le comprendre. Le cadre ne fait pas de la bonne Gestion des Tâches. Où est-ce qu'il va mettre l'attente de la Tâche quand il fait un join()? Il n'y a pas de file d'attente, qui aurait besoin d'un thread de contrôle de constamment chercher à la file d'attente pour voir ce qui est fini. C'est pourquoi le framework utilise une "continuation " threads". Lorsqu'une tâche ne join() le cadre est en supposant qu'il est en attente pour un seul des Tâches inférieure à la fin. Lorsque de nombreux join() les méthodes sont présents le thread ne peut pas continuer ainsi, une aide ou de maintien thread a besoin d'exister.
Comme indiqué ci-dessus, vous avez besoin d'un scatter-gather type fork-join processus. De là, vous pouvez fourche autant de Tâches
Les deux extraits de code posté par Holger Peine et insaisissable-code ne fait pas suivre la pratique recommandée qui est apparu dans javadoc pour la version 1.8:
Dans les deux cas FJPool a été instancié par le constructeur par défaut. Cela conduit à la construction de la piscine avec asyncMode=false, qui est par défaut:
cette façon de travail de la file d'attente est en fait lifo:
tête -> | t4 | t3 | t2 | t1 | ... | <- queue
Donc dans les extraits qu'ils fork() toutes les tâches en les poussant sur la pile et que
join() dans le même ordre, qui est depuis le plus profond de la tâche (t1) de niveau supérieur (t4) bloque jusqu'à ce qu'un autre thread va voler (t1), puis (t2) et ainsi de suite.
Depuis, il est enouth tâches de bloquer tous les pool de threads (task_count >> piscine.getParallelism()) la rémunération des coups de pied dans comme Louis Wasserman décrit.