La bonne façon de limiter au maximum le nombre de threads en cours d'exécution à la fois?
J'aimerais créer un programme qui s'exécute de multiples threads, mais il se limite à une constante, le nombre prédéfini de l'exécution de tâches simultanées, comme ceci (mais sans risque de race condition):
import threading
def f(arg):
global running
running += 1
print("Spawned a thread. running=%s, arg=%s" % (running, arg))
for i in range(100000):
pass
running -= 1
print("Done")
running = 0
while True:
if running < 8:
arg = get_task()
threading.Thread(target=f, args=[arg]).start()
Ce qui est le plus sûr/le moyen le plus rapide pour mettre en œuvre cette?
- Il semble que vous décrivez un pool de threads.
- Ne pool de threads implique de stocker les références des fils? J'aimerais le garder aussi léger que possible.
- docs.python.org/2/library/...
- Juste édité la question - qui a remplacé le "CPU" à la "lumière", comme il convient à mon problème plus précisément.
- Il n'importe pas vraiment si une référence à la thread est stocké ou pas, n'est ce pas? Huit références de l'objet ne sont pas va faire ou défaire votre programme.
- J'ai eu la pensée que de les remplacer rapidement pourrait causer une grosse perte de performance.
- BTW, j'ai mis à jour le code pour montrer que j'ai besoin de garder en tirant le fil des arguments.
- Le remplacement d'un objet de référence, par rapport à la surcharge de commencer un nouveau thread, est extrêmement rapide. Avez-vous fait des points de référence, ou vous êtes tout simplement en essayant de deviner ce qui est "lumière" et ce qui ne l'est pas?
- doublon potentiel de stackoverflow.com/questions/1787397/...
Vous devez vous connecter pour publier un commentaire.
Il semble que vous voulez mettre en œuvre le producteur/consommateur avec motif huit travailleurs. Python a un
File
classe et à cette fin, et il n'est pas thread-safe.Chaque travailleur doit appeler
get()
sur la file d'attente pour récupérer une tâche. Cet appel va bloquer si aucune tâche n'est disponible, entraînant le travailleur à aller au ralenti jusqu'à ce que l'on devient disponible. Ensuite, le travailleur doit exécuter la tâche et enfin appelertask_done()
sur la file d'attente.Vous mettrais des tâches dans la file d'attente en appelant
put()
sur la file d'attente.Depuis le thread principal, vous pouvez appeler
join()
sur la file d'attente à attendre jusqu'à ce que toutes les tâches en attente ont été achevés.Cette approche a l'avantage que vous n'êtes pas à la création et à la destruction de threads, ce qui est coûteux. Les threads de travail sera exécuté en continu, mais sera endormi quand pas de tâches dans la file d'attente, à l'aide de zéro de temps PROCESSEUR.
(Liés à la documentation de la page est un exemple de ce modèle.)
sémaphore est une variable ou d'un type abstrait de données qui est utilisée pour contrôler l'accès à une ressource commune par plusieurs processus en même temps une système comme un multiprogramming système d'exploitation, ce qui peut vous aider ici.
De cette façon, vous pouvez facilement limiter le nombre de threads qui seront exécutées en parallèle au cours de l'exécution du programme. Variable "maximumNumberOfThreads' peut être utilisé pour définir une limite supérieure à la valeur maximum de threads.
crédits
Il serait beaucoup plus facile à mettre en œuvre la présente comme un fil de la piscine ou à l'exécuteur testamentaire, à l'aide
multiprocessing.dummy.Pool
, ouconcurrent.futures.ThreadPoolExecutor
(ou, si l'utilisation de Python 2.x, le backportterme
). Par exemple:Bien sûr, si vous pouvez modifier le modèle d'extraction
get_task
à un push-modèleget_tasks
que, par exemple, les rendements des tâches une à la fois, c'est encore plus simple:Lorsque vous exécutez des tâches (par exemple,
get_task
soulève une exception, ouget_tasks
fonctionne à sec), ce sera automatiquement dire que l'exécuteur de s'arrêter après il draine la file d'attente, attendre qu'il s'arrête, et nettoyer tout.threading
etQueue
?multiprocessing
etconcurrent.futures
modules le code source, ils ne sont pas si compliqué que ça. Ou n'importe quel nombre de tutoriels. Mais pourquoi voulez-vous construire votre propre piscine de mise en œuvre lorsque vous avez déjà une bonne parfaitement l'un dans l'stdlib?multiprocessing
simule les discussions avec les processus enfants, puis génère des fonctionnalités supplémentaires (comme les piscines, explicite données partagées, etc.) sur le dessus de cela, et aussi (dansmultiprocessing.dummy
) fournit les mêmes fonctionnalités supplémentaires pour les threads. (Pas idéal stdlib design, mais des raisons historiques...)futures
s'exécute au-dessus dethreading
oumultiprocessing
(selon l'exécuteur vous utilisez), qui fournit la même interface à l'autre.ThreadPoolExecutor
ne fonctionne pas avec la méthode d'instance commeself.xxxx
?J'ai vu que le plus souvent écrit comme:
Si vous voulez maintenir fixe la taille du pool de threads en cours d'exécution que les processus de courte durée des tâches que de demander de nouveaux travaux, envisager une solution construite autour de Files d'attente, comme "Comment faire pour attendre jusqu'à ce que seulement le premier thread est terminé en Python".
J'ai rencontré ce même problème et a passé des jours (2 jours pour être précis) arriver à la bonne solution à l'aide d'une file d'attente. J'ai perdu un jour de descendre le ThreadPoolExecutor chemin, car il n'y a aucun moyen de limiter le nombre de threads que chose lance! Je nourris une liste de 5000 fichiers à copier et le code non réactif, une fois qu'il a obtenu jusqu'à environ 1500 simultanées des copies de fichiers, exécution de tous à la fois. Le max_workers paramètre sur la ThreadPoolExecutor contrôle uniquement la façon dont beaucoup de travailleurs sont en rotation fils pas combien de fils se lancé.
Ok, de toute façon, voici un exemple très simple de l'utilisation d'une File d'attente pour cette:
time.sleep(random.randint(1, 10))
depuis le q.get() doit bloquer jusqu'à ce qu'il y a quelque chose dans la file d'attenteThreadPoolExecutor(max_workers=10)
ou20
ou30
etcPour appliquer limitation sur fil de la création, de suivre cet exemple (il fonctionne vraiment):
Ou:
Une autre façon de définir un nombre de thread vérificateur de mutex/verrouillage comme l'exemple ci-dessous: