Utilisation du module de multitraitement de Python pour exécuter des exécutions de modèle SEAWAT / MODFLOW simultanées et séparées

Je suis en train de terminer 100 modèle fonctionne sur mon processeur 8 64-bit de Windows 7 machine. Je voudrais 7 les instances du modèle simultanément afin de réduire mon temps de fonctionnement total (approx. 9,5 min par modèle). J'ai regardé plusieurs threads concernant le Multitraitement module de Python, mais je suis en manque encore quelque chose.

En utilisant le module multiprocessing

Comment frayer parallèle enfant des processus sur un système multi-processeur?

Python Multitraitement de la file d'attente

Mon Processus:

J'ai 100 différents jeux de paramètres que j'aimerais parcourir SEAWAT/MODFLOW de comparer les résultats. J'ai pré-construit le modèle fichiers d'entrée pour chaque modèle et stockées dans leurs propres répertoires. Ce que je voudrais être en mesure de faire est de 7 modèles en cours d'exécution à un moment jusqu'à ce que toutes les réalisations ont été accomplies. Il n'y a pas besoin de communication entre les processus ou d'affichage des résultats. Pour l'instant j'ai seulement été en mesure de l'apparition des modèles de manière séquentielle:

import os,subprocess
import multiprocessing as mp

ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
files = []
for f in os.listdir(ws + r'\fieldgen\reals'):
    if f.endswith('.npy'):
        files.append(f)

## def work(cmd):
##     return subprocess.call(cmd, shell=False)

def run(f,def_param=ws):
    real = f.split('_')[2].split('.')[0]
    print 'Realization %s' % real

    mf2k = r'c:\modflow\mf2k.1_19\bin\mf2k.exe '
    mf2k5 = r'c:\modflow\MF2005_1_8\bin\mf2005.exe '
    seawatV4 = r'c:\modflow\swt_v4_00_04\exe\swt_v4.exe '
    seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '

    exe = seawatV4x64
    swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real

    os.system( exe + swt_nam )


if __name__ == '__main__':
    p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
    tasks = range(len(files))
    results = []
    for f in files:
        r = p.map_async(run(f), tasks, callback=results.append)

J'ai changé le if __name__ == 'main': à la suivante, dans l'espoir qu'il serait résoudre le manque de parallélisme qui est pour moi étant implantées sur le script ci-dessus par le for loop. Cependant, le modèle ne parvient pas à même de les exécuter (pas de Python erreur):

if __name__ == '__main__':
    p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
    p.map_async(run,((files[f],) for f in range(len(files))))

Toute aide est grandement appréciée!

MODIFIER 3/26/2012 13:31 HNE

En utilisant le "Manuel de la Piscine" méthode @J. F. Sebastian de la réponse ci-dessous-je obtenir l'exécution en parallèle de mon externes .exe. Modèle réalisations sont appelés dans des lots de 8 à un moment, mais il n'attendez pas pour ces 8 pistes pour terminer, avant d'appeler le lot suivant et ainsi de suite:

from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread

def run(f,ws):
    real = f.split('_')[-1].split('.')[0]
    print('Realization %s' % real)
    seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
    swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
    subprocess.check_call([seawatV4x64, swt_nam])

def worker(queue):
    """Process files from the queue."""
    for args in iter(queue.get, None):
        try:
            run(*args)
        except Exception as e: # catch exceptions to avoid exiting the
                               # thread prematurely
            print('%r failed: %s' % (args, e,), file=sys.stderr)

def main():
    # populate files
    ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
    wdir = os.path.join(ws, r'fieldgen\reals')
    q = Queue()
    for f in os.listdir(wdir):
        if f.endswith('.npy'):
            q.put_nowait((os.path.join(wdir, f), ws))

    # start threads
    threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
    for t in threads:
        t.daemon = True # threads die if the program dies
        t.start()

    for _ in threads: q.put_nowait(None) # signal no more files
    for t in threads: t.join() # wait for completion

if __name__ == '__main__':

    mp.freeze_support() # optional if the program is not frozen
    main()

Pas d'erreur traceback est disponible. Le run() fonction effectue son devoir quand il est appelé un modèle unique réalisation fichier avec plusieurs fichiers. La seule différence est que, avec plusieurs fichiers, il est appelé len(files) fois bien que chacune de ces instances ferme immédiatement et d'un seul modèle est autorisé à terminer au moment où le script se termine normalement (code de sortie 0).

De l'ajout de certains d'impression des déclarations de main() révèle quelques informations sur le thread actif-chiffres ainsi que la thread de l'état (notez que ceci est un test sur seulement 8 de la réalisation des fichiers pour faire de la capture d'écran plus faciles à gérer, théoriquement tous les 8 fichiers doivent être exécutés en même temps, mais le problème continue, où ils sont frayer et mourir immédiatement, à une exception près:

def main():
    # populate files
    ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
    wdir = os.path.join(ws, r'fieldgen\test')
    q = Queue()
    for f in os.listdir(wdir):
        if f.endswith('.npy'):
            q.put_nowait((os.path.join(wdir, f), ws))

    # start threads
    threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count())]
    for t in threads:
        t.daemon = True # threads die if the program dies
        t.start()
    print('Active Count a',threading.activeCount())
    for _ in threads:
        print(_)
        q.put_nowait(None) # signal no more files
    for t in threads: 
        print(t)
        t.join() # wait for completion
    print('Active Count b',threading.activeCount())

Utilisation du module de multitraitement de Python pour exécuter des exécutions de modèle SEAWAT /MODFLOW simultanées et séparées

**La ligne qui lit "D:\\Data\\Users..." est l'erreur de l'information jeté quand j'ai arrêter manuellement le modèle d'exécution à l'achèvement. Une fois que j'ai arrêter le modèle en cours d'exécution, le reste fil des lignes d'état signalés et le script se termine.

MODIFIER 3/26/2012 16:24 HNE

SEAWAT permet une exécution simultanée comme je l'ai fait dans le passé, la ponte des instances manuellement à l'aide de iPython et le lancement de chaque modèle de dossier de fichier. Cette fois-ci, j'ai décidé de lancer tous les passages du modèle à partir d'un emplacement unique, à savoir le répertoire où mon script réside. On dirait que le coupable peut être dans la façon dont SEAWAT est de sauver une partie de la sortie. Lorsque SEAWAT est exécuté, il crée immédiatement les fichiers se rapportant à l'exécution du modèle. Un de ces fichiers n'est pas enregistré dans le répertoire dans lequel la réalisation de modèle se trouve, mais dans le répertoire où se trouve le script. C'est la prévention de tout les threads suivants d'enregistrer le même nom de fichier dans le même emplacement (qui ils veulent tous faire depuis ces noms de fichiers sont génériques et non spécifiques à chaque réalisation). Le SEAWAT windows n'étaient pas ouverts assez longtemps pour moi de lire ou même de voir qu'il y a un message d'erreur, je n'ai réalisé cela lorsque je suis revenu en arrière et a essayé d'exécuter du code à l'aide de iPython qui affiche directement l'impression de SEAWAT au lieu d'ouvrir une nouvelle fenêtre pour exécuter le programme.

Je suis accepter @J. F. Sebastian répondre comme il est probable qu'une fois que je résoudre ce modèle exécutable question, l'insertion de code qu'il a fourni obtenez-moi où je dois être.

CODE FINAL

Ajouté la mdc argument en sous-processus.check_call au début de chaque instance de SEAWAT dans son propre répertoire. La clef.

from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread
import threading

def run(f,ws):
    real = f.split('_')[-1].split('.')[0]
    print('Realization %s' % real)
    seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
    cwd = ws + r'\reals\real%s\ss' % real
    swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
    subprocess.check_call([seawatV4x64, swt_nam],cwd=cwd)

def worker(queue):
    """Process files from the queue."""
    for args in iter(queue.get, None):
        try:
            run(*args)
        except Exception as e: # catch exceptions to avoid exiting the
                               # thread prematurely
            print('%r failed: %s' % (args, e,), file=sys.stderr)

def main():
    # populate files
    ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
    wdir = os.path.join(ws, r'fieldgen\reals')
    q = Queue()
    for f in os.listdir(wdir):
        if f.endswith('.npy'):
            q.put_nowait((os.path.join(wdir, f), ws))

    # start threads
    threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count()-1)]
    for t in threads:
        t.daemon = True # threads die if the program dies
        t.start()
    for _ in threads: q.put_nowait(None) # signal no more files
    for t in threads: t.join() # wait for completion

if __name__ == '__main__':
    mp.freeze_support() # optional if the program is not frozen
    main()

source d'informationauteur