Utilisation du module de multitraitement de Python pour exécuter des exécutions de modèle SEAWAT / MODFLOW simultanées et séparées
Je suis en train de terminer 100 modèle fonctionne sur mon processeur 8 64-bit de Windows 7 machine. Je voudrais 7 les instances du modèle simultanément afin de réduire mon temps de fonctionnement total (approx. 9,5 min par modèle). J'ai regardé plusieurs threads concernant le Multitraitement module de Python, mais je suis en manque encore quelque chose.
En utilisant le module multiprocessing
Comment frayer parallèle enfant des processus sur un système multi-processeur?
Python Multitraitement de la file d'attente
Mon Processus:
J'ai 100 différents jeux de paramètres que j'aimerais parcourir SEAWAT/MODFLOW de comparer les résultats. J'ai pré-construit le modèle fichiers d'entrée pour chaque modèle et stockées dans leurs propres répertoires. Ce que je voudrais être en mesure de faire est de 7 modèles en cours d'exécution à un moment jusqu'à ce que toutes les réalisations ont été accomplies. Il n'y a pas besoin de communication entre les processus ou d'affichage des résultats. Pour l'instant j'ai seulement été en mesure de l'apparition des modèles de manière séquentielle:
import os,subprocess
import multiprocessing as mp
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
files = []
for f in os.listdir(ws + r'\fieldgen\reals'):
if f.endswith('.npy'):
files.append(f)
## def work(cmd):
## return subprocess.call(cmd, shell=False)
def run(f,def_param=ws):
real = f.split('_')[2].split('.')[0]
print 'Realization %s' % real
mf2k = r'c:\modflow\mf2k.1_19\bin\mf2k.exe '
mf2k5 = r'c:\modflow\MF2005_1_8\bin\mf2005.exe '
seawatV4 = r'c:\modflow\swt_v4_00_04\exe\swt_v4.exe '
seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
exe = seawatV4x64
swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
os.system( exe + swt_nam )
if __name__ == '__main__':
p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
tasks = range(len(files))
results = []
for f in files:
r = p.map_async(run(f), tasks, callback=results.append)
J'ai changé le if __name__ == 'main':
à la suivante, dans l'espoir qu'il serait résoudre le manque de parallélisme qui est pour moi étant implantées sur le script ci-dessus par le for loop
. Cependant, le modèle ne parvient pas à même de les exécuter (pas de Python erreur):
if __name__ == '__main__':
p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
p.map_async(run,((files[f],) for f in range(len(files))))
Toute aide est grandement appréciée!
MODIFIER 3/26/2012 13:31 HNE
En utilisant le "Manuel de la Piscine" méthode @J. F. Sebastian de la réponse ci-dessous-je obtenir l'exécution en parallèle de mon externes .exe. Modèle réalisations sont appelés dans des lots de 8 à un moment, mais il n'attendez pas pour ces 8 pistes pour terminer, avant d'appeler le lot suivant et ainsi de suite:
from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread
def run(f,ws):
real = f.split('_')[-1].split('.')[0]
print('Realization %s' % real)
seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
subprocess.check_call([seawatV4x64, swt_nam])
def worker(queue):
"""Process files from the queue."""
for args in iter(queue.get, None):
try:
run(*args)
except Exception as e: # catch exceptions to avoid exiting the
# thread prematurely
print('%r failed: %s' % (args, e,), file=sys.stderr)
def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
q = Queue()
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))
# start threads
threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()
for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion
if __name__ == '__main__':
mp.freeze_support() # optional if the program is not frozen
main()
Pas d'erreur traceback est disponible. Le run()
fonction effectue son devoir quand il est appelé un modèle unique réalisation fichier avec plusieurs fichiers. La seule différence est que, avec plusieurs fichiers, il est appelé len(files)
fois bien que chacune de ces instances ferme immédiatement et d'un seul modèle est autorisé à terminer au moment où le script se termine normalement (code de sortie 0).
De l'ajout de certains d'impression des déclarations de main()
révèle quelques informations sur le thread actif-chiffres ainsi que la thread de l'état (notez que ceci est un test sur seulement 8 de la réalisation des fichiers pour faire de la capture d'écran plus faciles à gérer, théoriquement tous les 8 fichiers doivent être exécutés en même temps, mais le problème continue, où ils sont frayer et mourir immédiatement, à une exception près:
def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\test')
q = Queue()
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))
# start threads
threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count())]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()
print('Active Count a',threading.activeCount())
for _ in threads:
print(_)
q.put_nowait(None) # signal no more files
for t in threads:
print(t)
t.join() # wait for completion
print('Active Count b',threading.activeCount())
**La ligne qui lit "D:\\Data\\Users...
" est l'erreur de l'information jeté quand j'ai arrêter manuellement le modèle d'exécution à l'achèvement. Une fois que j'ai arrêter le modèle en cours d'exécution, le reste fil des lignes d'état signalés et le script se termine.
MODIFIER 3/26/2012 16:24 HNE
SEAWAT permet une exécution simultanée comme je l'ai fait dans le passé, la ponte des instances manuellement à l'aide de iPython et le lancement de chaque modèle de dossier de fichier. Cette fois-ci, j'ai décidé de lancer tous les passages du modèle à partir d'un emplacement unique, à savoir le répertoire où mon script réside. On dirait que le coupable peut être dans la façon dont SEAWAT est de sauver une partie de la sortie. Lorsque SEAWAT est exécuté, il crée immédiatement les fichiers se rapportant à l'exécution du modèle. Un de ces fichiers n'est pas enregistré dans le répertoire dans lequel la réalisation de modèle se trouve, mais dans le répertoire où se trouve le script. C'est la prévention de tout les threads suivants d'enregistrer le même nom de fichier dans le même emplacement (qui ils veulent tous faire depuis ces noms de fichiers sont génériques et non spécifiques à chaque réalisation). Le SEAWAT windows n'étaient pas ouverts assez longtemps pour moi de lire ou même de voir qu'il y a un message d'erreur, je n'ai réalisé cela lorsque je suis revenu en arrière et a essayé d'exécuter du code à l'aide de iPython qui affiche directement l'impression de SEAWAT au lieu d'ouvrir une nouvelle fenêtre pour exécuter le programme.
Je suis accepter @J. F. Sebastian répondre comme il est probable qu'une fois que je résoudre ce modèle exécutable question, l'insertion de code qu'il a fourni obtenez-moi où je dois être.
CODE FINAL
Ajouté la mdc argument en sous-processus.check_call au début de chaque instance de SEAWAT dans son propre répertoire. La clef.
from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread
import threading
def run(f,ws):
real = f.split('_')[-1].split('.')[0]
print('Realization %s' % real)
seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
cwd = ws + r'\reals\real%s\ss' % real
swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
subprocess.check_call([seawatV4x64, swt_nam],cwd=cwd)
def worker(queue):
"""Process files from the queue."""
for args in iter(queue.get, None):
try:
run(*args)
except Exception as e: # catch exceptions to avoid exiting the
# thread prematurely
print('%r failed: %s' % (args, e,), file=sys.stderr)
def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
q = Queue()
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))
# start threads
threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count()-1)]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()
for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion
if __name__ == '__main__':
mp.freeze_support() # optional if the program is not frozen
main()
source d'informationauteur
Vous devez vous connecter pour publier un commentaire.
Je ne vois pas de calculs dans le code Python. Si vous avez juste besoin d'exécuter plusieurs programmes externes en parallèle, il est suffisant d'utiliser
subprocess
pour exécuter les programmes etthreading
module de maintenir constant le nombre de processus en cours d'exécution, mais le code le plus simple est d'utilisermultiprocessing.Pool
:Si il y a beaucoup de fichiers, puis
pool.map()
pourrait être remplacé parfor _ in pool.imap_unordered(safe_run, files): pass
.Il est également
mutiprocessing.dummy.Pool
qui fournit la même interface quemultiprocessing.Pool
mais utilise des threads au lieu de processus qui pourrait être plus approprié dans ce cas.Vous n'avez pas besoin de garder certains Processeurs gratuit. Il suffit d'utiliser une commande qui démarre votre exécutables avec une priorité faible (sur Linux c'est un
nice
programme).ThreadPoolExecutor
exemplesimultanées.les contrats à terme.ThreadPoolExecutor
serait à la fois simple et suffisant, mais il nécessite De la 3e partie de la dépendance Python 2.x (il est dans la stdlib depuis Python 3.2).Ou si nous ignorer les exceptions soulevées par
run()
:subprocess
+threading
(manuel de la piscine) solutionIci, c'est ma façon à maintenir le minimum x nombre de threads dans la mémoire. C'est une combinaison de filetage et de multitraitement modules. Il peut être inhabituel à d'autres techniques comme le respecté camarade membres ont expliqué ci-dessus, MAIS peut être une valeur considérable. Pour en faciliter l'explication, je suis prise d'un scénario de l'analyse d'un minimum de 5 sites à la fois.
donc, ici, il est:-
Suivant est threadController fonction. Cette fonction contrôle le flux de threads à la mémoire principale. Il gardera l'activation du fils de maintenir la threadNum "minimum" de limiter ie. 5. Aussi il ne sera pas quitter jusqu'à ce que, tous les threads Actifs(acitveCount) sont finis.
Il sera de maintenir un minimum de threadNum(5) startProcess fonction threads (ces fils finira par démarrer le Processus à partir de la processList tout en se joignant à eux avec un temps de 60 secondes). Après avoir regarder threadController, il y aurait 2 fils qui ne sont pas inclus dans le dessus de la limite de 5 ie. le thread Principal et le threadController fil lui-même. c'est pourquoi le filetage.activeCount() != 2 a été utilisé.
startProcess fonction, comme un thread séparé, serait de commencer les Processus de la processlist. Le but de cette fonction (**qui a commencé comme un thread différent), c'est qu'Il deviendrait un parent de thread pour le Processus. Alors, quand Il va se joindre à eux avec un délai d'attente de 60 secondes, ce qui n'empêchera pas le startProcess fil à aller de l'avant, mais cela n'arrêtera pas threadController à effectuer. Ainsi, de cette façon, threadController fonctionnera comme requis.
En plus de maintenir un minimum de nombre de threads dans la mémoire, mon but était d'avoir aussi quelque chose qui pourrait éviter coincé threads ou processus dans la mémoire. Je l'ai fait en utilisant le temps de sortir de la fonction.
Mes excuses pour les faute de frappe.
J'espère que cette construction, à aider quelqu'un dans ce monde.
En ce qui concerne,
Vikas Gautam