Ce qui détermine le nombre de threads Java ForkJoinPool crée?

Autant que j'avais compris ForkJoinPool, que la piscine crée un nombre fixe de threads (par défaut: nombre de cœurs) et ne pourra jamais créer plusieurs threads (à moins que l'application indique une nécessité pour celles-ci en utilisant managedBlock).

Cependant, l'utilisation de ForkJoinPool.getPoolSize() j'ai découvert que dans un programme qui crée de 30 000 tâches (RecursiveAction), le ForkJoinPool de l'exécution de ces tâches utilise 700 threads en moyenne (fils comptés chaque fois qu'une tâche est créée). Les tâches ne font pas I/O, mais pur calcul; la seule inter-tâche de synchronisation est l'appel de ForkJoinTask.join() et l'accès AtomicBooleans, i.e. il n'y a pas de fil-opérations de blocage.

Depuis join() ne pas bloquer le thread appelant comme je le comprends, il n'y a aucune raison pour qu'un thread dans la piscine doit jamais bloquer, et donc (j'avais supposé) il devrait y avoir aucune raison de créer toute autre fils (qui est de toute évidence passe quand même).

Alors, pourquoi ne ForkJoinPool créer autant de fils? Quels facteurs déterminent le nombre de threads créés?

J'avais espéré que cette question pourrait être répondu sans poster du code, mais ici, il s'agit à la demande. Ce code est un extrait d'un programme de quatre fois la taille, réduite à l'essentiel des parties; il ne compile pas car il est. Si vous le souhaitez, je peux bien sûr post le programme complet, trop.

Le programme recherche un labyrinthe pour un chemin d'accès à partir d'un point de départ à un point de fin en utilisant la profondeur de la première recherche. Une solution de l'existence est garantie. La logique principale est dans le compute() méthode de SolverTask: Un RecursiveAction qui commence à un certain point donné et se poursuit avec tous les points voisins accessible à partir du point courant. Plutôt que de créer un nouveau SolverTask à chaque point d'embranchement (qui permettrait de créer de trop nombreuses tâches), il pousse tous ses voisins sauf sur un retour en arrière de la pile à être traitées plus tard et se poursuit avec seulement l'un des voisins ne les pousse pas à la pile. Une fois qu'il atteint une impasse de cette façon, le point le plus récemment poussé à la mandature de la pile est sortie, et la recherche se poursuit à partir de là (en coupant le chemin construit à partir de la tâche du point de départ en conséquence). Une nouvelle tâche est créée une fois qu'une tâche trouve sa mandature de la pile supérieure à un certain seuil; à partir de ce moment, la tâche, tout en continuant à la pop de sa mandature de la pile jusqu'à ce que, épuisé, ne va pas pousser les autres points à sa pile lors de l'atteinte d'un point d'embranchement, mais de créer une nouvelle tâche pour chaque point. Ainsi, la taille des tâches peut être ajustée à l'aide de la pile de seuil limite.

Les chiffres que j'ai cités ci-dessus ("tâches de 30 000, 700 threads en moyenne") qui sont à la recherche d'un labyrinthe de 5000x5000 cellules. Alors, voici l'essentiel de code:

class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
//Once the backtrack stack has reached this size, the current task
//will never add another cell to it, but create a new task for each
//newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000;
/**
* @return Tries to compute a path through the maze from local start to end
* and returns that (or null if no such path found)
*/
@Override
public ArrayDeque<Point>  compute() {
//Is this task still accepting new branches for processing on its own,
//or will it create new tasks to handle those?
boolean stillAcceptingNewBranches = true;
Point current = localStart;
ArrayDeque<Point> pathFromLocalStart = new ArrayDeque<Point>();  //Path from localStart to (including) current
ArrayDeque<PointAndDirection> backtrackStack = new ArrayDeque<PointAndDirection>();
//Used as a stack: Branches not yet taken; solver will backtrack to these branching points later
Direction[] allDirections = Direction.values();
while (!current.equals(end)) {
pathFromLocalStart.addLast(current);
//Collect current's unvisited neighbors in random order: 
ArrayDeque<PointAndDirection> neighborsToVisit = new ArrayDeque<PointAndDirection>(allDirections.length);  
for (Direction directionToNeighbor: allDirections) {
Point neighbor = current.getNeighbor(directionToNeighbor);
//contains() and hasPassage() are read-only methods and thus need no synchronization
if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor))
neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite));
}
//Process unvisited neighbors
if (neighborsToVisit.size() == 1) {
//Current node is no branch: Continue with that neighbor
current = neighborsToVisit.getFirst().getPoint();
continue;
}
if (neighborsToVisit.size() >= 2) {
//Current node is a branch
if (stillAcceptingNewBranches) {
current = neighborsToVisit.removeLast().getPoint();
//Push all neighbors except one on the backtrack stack for later processing
for(PointAndDirection neighborAndDirection: neighborsToVisit) 
backtrackStack.push(neighborAndDirection);
if (backtrackStack.size() > MAX_BACKTRACK_CELLS)
stillAcceptingNewBranches = false;
//Continue with the one neighbor that was not pushed onto the backtrack stack
continue;
} else {
//Current node is a branch point, but this task does not accept new branches any more: 
//Create new task for each neighbor to visit and wait for the end of those tasks
SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()];
int t = 0;
for(PointAndDirection neighborAndDirection: neighborsToVisit)  {
SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze);
task.fork();
subTasks[t++] = task;
}
for (SolverTask task: subTasks) {
ArrayDeque<Point> subTaskResult = null;
try {
subTaskResult = task.join();
} catch (CancellationException e) {
//Nothing to do here: Another task has found the solution and cancelled all other tasks
}
catch (Exception e) {
e.printStackTrace();
}
if (subTaskResult != null) { //subtask found solution
pathFromLocalStart.addAll(subTaskResult);
//No need to wait for the other subtasks once a solution has been found
return pathFromLocalStart;
}
} //for subTasks
} //else (not accepting any more branches) 
} //if (current node is a branch)
//Current node is dead end or all its neighbors lead to dead ends:
//Continue with a node from the backtracking stack, if any is left:
if (backtrackStack.isEmpty()) {
return null; //No more backtracking avaible: No solution exists => end of this task
}
//Backtrack: Continue with cell saved at latest branching point:
PointAndDirection pd = backtrackStack.pop();
current = pd.getPoint();
Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint());
//DEBUG System.out.println("Backtracking to " +  branchingPoint);
//Remove the dead end from the top of pathSoFar, i.e. all cells after branchingPoint:
while (!pathFromLocalStart.peekLast().equals(branchingPoint)) {
//DEBUG System.out.println("    Going back before " + pathSoFar.peekLast());
pathFromLocalStart.removeLast();
}
//continue while loop with newly popped current
} //while (current ...
if (!current.equals(end)) {         
//this task was interrupted by another one that already found the solution 
//and should end now therefore:
return null;
} else {
//Found the solution path:
pathFromLocalStart.addLast(current);
return pathFromLocalStart;
}
} //compute()
} //class SolverTask
@SuppressWarnings("serial")
public class ParallelMaze  {
//for each cell in the maze: Has the solver visited it yet?
private final AtomicBoolean[][] visited;
/**
* Atomically marks this point as visited unless visited before
* @return whether the point was visited for the first time, i.e. whether it could be marked
*/
boolean visit(Point p) {
return  visited[p.getX()][p.getY()].compareAndSet(false, true);
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1));
//Start initial task
long startTime = System.currentTimeMillis();
//since SolverTask.compute() expects its starting point already visited, 
//must do that explicitly for the global starting point:
maze.visit(maze.start);
maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze));
//One solution is enough: Stop all tasks that are still running
pool.shutdownNow();
pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
long endTime = System.currentTimeMillis();
System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " + 
width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s.");
}
InformationsquelleAutor Holger Peine | 2012-05-29