Python multitraitement piscine se bloque à la rejoindre?
Je suis en train d'exécuter du code python sur plusieurs fichiers en parallèle. Le concept est fondamentalement:
def process_file(filename, foo, bar, baz=biz):
# do stuff that may fail and cause exception
if __name__ == '__main__':
# setup code setting parameters foo, bar, and biz
psize = multiprocessing.cpu_count()*2
pool = multiprocessing.Pool(processes=psize)
map(lambda x: pool.apply_async(process_file, (x, foo, bar), dict(baz=biz)), sys.argv[1:])
pool.close()
pool.join()
J'ai déjà utilisé la piscine.carte pour faire quelque chose de semblable et il fonctionnait très bien, mais je n'arrive pas à l'utiliser ici, parce que la piscine.la carte n'est pas (semble) permettez-moi de passer en arguments supplémentaires (et l'utilisation de lambda à faire, ça ne marchera pas parce que lambda ne peut pas être classé).
Alors maintenant j'essaie de faire des choses à travailler à l'aide de apply_async() directement. Mon problème est que le code semble se bloquer et de ne jamais quitter. Un peu de fichiers échoue avec une exception, mais je ne vois pas pourquoi ce serait la cause de se joindre à l'échec/de blocage? Il est intéressant de noter que si aucun des fichiers échoue avec une exception, il n'terminer proprement.
Ce qui me manque?
Edit: Quand la fonction (et donc d'un travailleur) échoue, je vois cette exception:
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 376, in _handle_results
task = get()
TypeError: ('__init__() takes at least 3 arguments (1 given)', <class 'subprocess.CalledProcessError'>, ())
Si je vois encore un de ces, le processus parent processus se bloque jamais, jamais récolter les enfants et sortant.
- Votre code semble fonctionner très bien, même si je jette aléatoire des exceptions dans
process_file
. Alors peut-être que cela a à voir avec ce que vous faites réellement dansprocess_file
qui est à l'origine des problèmes. - Hein. quelle version de python? Je suis sur 2.7. process_file dans le programme réel est assez complexe, fait un usage intensif de la LIP, NetworkX, poly2tri, et d'autres bibliothèques. Je connais au moins 2 endroits que j'ai connu des bugs qui peuvent causer des exceptions dans certains cas, mais j'ai besoin de tout simplement ignorer ces erreurs et de progresser. Je suis perplexe quant à pourquoi il ne serait jamais sortie pour moi mais le travail pour vous.
- 2.7.2, c'est ce que j'ai testé avec: gist.github.com/robertklep/5125319
- Qui ressemble certainement raisonnable de cas de test, et il fonctionne très bien sur mon système. Maintenant, je suis complètement perdu.
- Commencez-vous les sous-processus de l'intérieur de vos travailleurs?
- oui. est-ce un problème?
- Je viens aussi vu ceci: bugs.python.org/issue9400
- Droit, qui est très similaire.
Vous devez vous connecter pour publier un commentaire.
Désolé de répondre à ma propre question, mais j'ai trouvé au moins une solution de contournement si au cas où quelqu'un d'autre a un problème similaire, j'ai envie de le poster ici. Je vais accepter toutes les meilleures réponses là-bas.
Je crois que la racine du problème est http://bugs.python.org/issue9400 . Cela me dit deux choses:
Dans mon cas, mon travailleur de la fonction a été le lancement d'un sous-processus qui a été segfaulting. Cette retourné CalledProcessError exception, ce qui n'est pas pickleable. Pour une raison quelconque, ce qui rend la piscine de l'objet dans le parent de sortir pour le déjeuner et pas de retour de l'appel de join().
Dans mon cas particulier, je n'aime pas ce que l'exception a été. Au plus je veux l'enregistrer et continuer. Pour ce faire, j'ai simplement envelopper mon top travailleur de la fonction dans un try/except clause. Si le travailleur lance une exception, il est capturé avant d'essayer de revenir au processus parent, connecté, puis le processus de travail s'arrête normalement car il n'est plus d'essayer d'envoyer l'exception à travers. Voir ci-dessous:
Puis, j'ai ma carte initiale de l'appel de fonction process_file_wrapped() à la place de celui d'origine. Maintenant, mon code fonctionne comme prévu.
Vous pouvez en fait utiliser une
functools.partial
instance au lieu d'unlambda
dans les cas où l'objet doit être nettoyée.partial
objets sont pickleable depuis Python 2.7 et Python 3).pool.map
, alors peut-être que cela aidera.functools.partial()
vous permettra d'utiliserPool.map()
en plus de scénarios, mais il n'affecte pas le problème d'exception. Je viens de tomber sur le même problème avecCalledProcessError
la pendaison de la piscine. La capture, l'enregistrement et l'éducation d'unRuntimeError
a fonctionné pour moi.Pour ce que ça vaut, j'ai eu un bug similaire (pas le même) quand
pool.map
accroché. Mon cas d'utilisation m'a permis d'utiliser piscine.mettre fin à de le résoudre (assurez-vous que le vôtre n'est que bien avant de changer de stuff).J'ai utilisé piscine.carte avant d'appeler
terminate
donc je sais que tout a été fini, à partir de la docs:Si c'est votre cas d'utilisation, ce peut être une façon de le patcher.