La meilleure façon d'effectuer le traitement multiple dans les demandes avec le python Tornade serveur?
Je suis en utilisant des e/S non bloquantes python serveur Tornade. J'ai une classe de GET
les demandes qui peuvent prendre beaucoup de temps pour le terminer (pense que dans la gamme de 5 à 10 secondes). Le problème est que la Tornade blocs sur ces demandes suivantes rapide des demandes sont maintenus en place jusqu'à ce que la lenteur de la demande complète.
J'ai regardé: https://github.com/facebook/tornado/wiki/Threading-and-concurrency et est venu à la conclusion que je voulais une combinaison de #3 (d'autres processus) et #4 (les autres threads). #4 sur son propre eu des problèmes et j'ai été incapable d'obtenir un contrôle fiable de retour à la ioloop quand il y avait un autre thread à faire le "heavy_lifting". (Je suppose que c'était dû à la GIL et le fait que le heavy_lifting tâche a charge élevée de l'UC et continue tirant de contrôle à distance de la principale ioloop, mais c'est une supposition).
J'ai donc été prototypage comment résoudre ce problème en faisant des "gros travaux" tâches au sein de ces lentes GET
demandes dans un processus séparé et un rappel de retour dans la Tornade ioloop lorsque le processus est terminé pour terminer la demande. Cela libère de la ioloop pour traiter d'autres demandes.
J'ai créé un exemple simple illustrant une solution possible, mais je suis curieux d'avoir des retours de la communauté sur elle.
Ma question est double: Comment cela peut-il l'approche actuelle d'être simplifié? Quels sont les pièges potentiellement exister avec elle?
L'Approche
-
Utiliser Tornade builtin
asynchronous
décorateur qui permet à une demande de rester ouvert et de la ioloop pour continuer. -
Frayer un processus distinct pour les "gros travaux" tâches à l'aide de python
multiprocessing
module. J'ai d'abord tenté d'utiliser lethreading
module, mais n'a pas pu obtenir toute la fiabilité de renoncer à de contrôle le ioloop. Il apparaît également quemutliprocessing
permettront de profiter des multicores. -
Commencer un "observateur" fil dans les principales ioloop processus à l'aide de la
threading
module dont le travail est de regarder unmultiprocessing.Queue
pour les résultats du "gros œuvre" de la tâche quand il se termine. Cela était nécessaire parce que j'avais besoin de savoir que la heavy_lifting tâche avait terminé tout en étant en mesure de toujours aviser le ioloop que cette demande est maintenant terminé. -
Assurez-vous que le "guetteur" fil abandonne le contrôle de la principale ioloop boucle souvent avec
time.sleep(0)
appels afin que d'autres demandes continuent d'obtenir facilement traitées. -
Quand il y a un résultat dans la file d'attente puis ajouter un rappel de la "watcher" thread à l'aide de
tornado.ioloop.IOLoop.instance().add_callback()
qui est documenté pour être le seul moyen sûr d'appel ioloop cas des autres threads. -
Assurez-vous d'appeler ensuite
finish()
dans le rappel pour remplir la demande et à la main sur une réponse.
Ci-dessous est un exemple de code montrant cette approche. multi_tornado.py
est le serveur de mise en œuvre de ce qui précède, et call_multi.py
est un exemple de script qui appelle le serveur de deux façons différentes pour tester le serveur. Les deux tests d'appeler le serveur avec 3 lent GET
demandes de suivi de 20 rapide GET
demandes. Les résultats sont présentés pour les deux cours d'exécution avec et sans le threading.
Dans le cas de l'exécution avec "pas de filetage" les 3 demandes lentes (bloc de chaque de prendre un peu plus d'une seconde pour terminer). Quelques-uns des 20 rapide des demandes faufiler entre certains de la lenteur de la demande dans le ioloop (pas totalement sûr de comment cela se produit - mais il pourrait être un artefact que je suis en cours d'exécution à la fois le serveur et le client script de test sur la même machine). Le point ici est que tous les rapide, les demandes sont à des degrés divers.
Dans le cas de l'exécution avec threading activé le 20 rapide des requêtes de la première immédiatement et les trois demandes lentes complète à peu près au même moment par la suite qu'ils ont été exécutés en parallèle. C'est le comportement souhaité. Les trois lent demandes de 2,5 secondes pour terminer en parallèle, alors que dans le non filetée cas, les trois demandes lentes prendre environ 3,5 secondes au total. Donc, il y a environ 35% de la vitesse maximum de l'ensemble (je suppose en raison de multicœur de partage). Mais le plus important - de la rapide des demandes ont été immédiatement traitées en leu de les lentes.
Je n'ai pas beaucoup d'expérience avec la programmation multithread - alors que cette apparence travaille ici, je suis curieux de savoir:
Est-il un moyen plus simple pour accomplir cette? Ce monstre peuvent se cacher à l'intérieur de cette approche?
(Note: Un avenir compromis peut être de simplement exécuter plusieurs instances de Tornade avec un reverse proxy comme nginx faire de l'équilibrage de la charge. Peu importe ce que je vais être l'exécution de plusieurs instances avec un équilibreur de charge - mais je suis inquiet au sujet de simplement jeter le matériel à ce problème, puisqu'il semble que le matériel est directement couplé au problème en termes de blocage.)
Exemple De Code
multi_tornado.py
(serveur exemple):
import time
import threading
import multiprocessing
import math
from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop
# run in some other process - put result in q
def heavy_lifting(q):
t0 = time.time()
for k in range(2000):
math.factorial(k)
t = time.time()
q.put(t - t0) # report time to compute in queue
class FastHandler(RequestHandler):
def get(self):
res = 'fast result ' + self.get_argument('id')
print res
self.write(res)
self.flush()
class MultiThreadedHandler(RequestHandler):
# Note: This handler can be called with threaded = True or False
def initialize(self, threaded=True):
self._threaded = threaded
self._q = multiprocessing.Queue()
def start_process(self, worker, callback):
# method to start process and watcher thread
self._callback = callback
if self._threaded:
# launch process
multiprocessing.Process(target=worker, args=(self._q,)).start()
# start watching for process to finish
threading.Thread(target=self._watcher).start()
else:
# threaded = False just call directly and block
worker(self._q)
self._watcher()
def _watcher(self):
# watches the queue for process result
while self._q.empty():
time.sleep(0) # relinquish control if not ready
# put callback back into the ioloop so we can finish request
response = self._q.get(False)
IOLoop.instance().add_callback(lambda: self._callback(response))
class SlowHandler(MultiThreadedHandler):
@asynchronous
def get(self):
# start a thread to watch for
self.start_process(heavy_lifting, self._on_response)
def _on_response(self, delta):
_id = self.get_argument('id')
res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
print res
self.write(res)
self.flush()
self.finish() # be sure to finish request
application = Application([
(r"/fast", FastHandler),
(r"/slow", SlowHandler, dict(threaded=False)),
(r"/slow_threaded", SlowHandler, dict(threaded=True)),
])
if __name__ == "__main__":
application.listen(8888)
IOLoop.instance().start()
call_multi.py
(client testeur):
import sys
from tornado.ioloop import IOLoop
from tornado import httpclient
def run(slow):
def show_response(res):
print res.body
# make 3 "slow" requests on server
requests = []
for k in xrange(3):
uri = 'http://localhost:8888/{}?id={}'
requests.append(uri.format(slow, str(k + 1)))
# followed by 20 "fast" requests
for k in xrange(20):
uri = 'http://localhost:8888/fast?id={}'
requests.append(uri.format(k + 1))
# show results as they return
http_client = httpclient.AsyncHTTPClient()
print 'Scheduling Get Requests:'
print '------------------------'
for req in requests:
print req
http_client.fetch(req, show_response)
# execute requests on server
print '\nStart sending requests....'
IOLoop.instance().start()
if __name__ == '__main__':
scenario = sys.argv[1]
if scenario == 'slow' or scenario == 'slow_threaded':
run(scenario)
Résultats De Test
En exécutant python call_multi.py slow
(le comportement de blocage):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20
En exécutant python call_multi.py slow_threaded
(le comportement désiré):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s
- Recommandation: attention pour le mur de texte.
- OK. Des Suggestions? N'est-ce pas clair pour moi de communiquer tous les détails de ce qui se passe beaucoup plus concise.
- Elle est généralement préférable de poser des questions comme celle-ci dans plusieurs petites. Mais, j'ai peut-être tort. Alors... est-ce votre seule question, comment simplifier cela? J'en ai mis que dans le haut - plus intéressant.
- Je suis à la recherche de la simplification ou de l'autre approche. J'ai édité la question légèrement afin de mettre un peu plus de ce que je suis à la recherche d'avant.
- Il me semble que vous pouvez simplifier ce à l'aide d'une file d'attente de demandes qui alimente un processus de piscine tels que celui trouvé dans le module multiprocessing. Voir docs.python.org/2/library/... pour plus d'info.
- curieux, comment avez-vous résoudre ce problème? J'ai le même problème 🙂 Merci!
- J'ai été très occupé avec d'autres aspects du projet - de sorte que je n'ai pas fait plus encore - mais le plan sur l'exécution ci-dessus MultithreadedHandler. J'ai vraiment pas vu d'autres commentaires sur d'autres approches qui utilisent la Tornade.
Vous devez vous connecter pour publier un commentaire.
Si vous êtes prêt à utiliser
simultanées.les contrats à terme.ProcessPoolExecutor
au lieu demultiprocessing
, c'est en fait très simple. Tornade ioloop prend déjà en chargeconcurrent.futures.Future
, afin qu'ils jouent bien ensemble la sortie de la boîte.concurrent.futures
est inclus dans Python 3.2+, et a été intégré à Python 2.x.Voici un exemple:
De sortie:
ProcessPoolExecutor
a un nombre plus limité d'API quemultiprocessing.Pool
, mais si vous n'avez pas besoin de fonctionnalités plus avancées demultiprocessing.Pool
, c'est utile parce que l'intégration est beaucoup plus simple.ThreadPoolExecutor
et suivent le même modèle. Vous n'avez pas besoinProcessPoolExecutor
parce que l'accès à un DB de blocage I/O, ce qui devrait libérer le GIL.multiprocessing.Pool
peut être intégré dans letornado
I/O loop, mais c'est un peu brouillon. Un beaucoup plus propre d'intégration peut être fait en utilisantconcurrent.futures
(voir mon autre réponse pour plus de détails), mais si vous êtes coincé sur Python 2.x et ne peut pas installer lesconcurrent.futures
backport, voici comment vous pouvez le faire strictement à l'aide demultiprocessing
:La
multiprocessing.Pool.apply_async
etmultiprocessing.Pool.map_async
méthodes, les deux ont une option decallback
paramètre, ce qui signifie que les deux peuvent potentiellement être branché dans unetornado.gen.Task
. Donc, dans la plupart des cas, l'exécution de code asynchrone dans un sous-processus est aussi simple que cela:Comme je l'ai mentionné, cela fonctionne bien dans le plus cas. Mais si
worker()
déclenche une exception,callback
n'est jamais appelée, ce qui signifie que legen.Task
ne se termine jamais, et vous accrocher à jamais. Maintenant, si vous savez que votre travail ne sera jamais lever une exception (parce que vous avez enveloppé le tout dans untry
/except
, par exemple), vous vous ferez un plaisir d'utiliser cette approche. Toutefois, si vous voulez laisser les exceptions s'échapper du travailleur, la seule solution que j'ai trouvé était à la sous-classe certains multitraitement composants, et de faire appelcallback
même si le travailleur sous-processus a soulevé une exception:Avec ces changements, l'objet de l'exception sera retournée par la
gen.Task
, plutôt que de lagen.Task
suspendu indéfiniment. J'ai aussi mis à jour monasync_run
méthode de ré-augmenter l'exception lors de son retour, avaient fait d'autres modifications à fournir de meilleurs retraçage pour les exceptions levées dans le travailleur sous-processus. Voici le code complet:Voici comment il se comporte pour le client:
Et si j'envoie simultanément deux curl demandes, nous pouvons voir qu'ils sont manipulés de façon asynchrone sur le côté serveur:
Edit:
Notez que ce code devient plus simple avec Python 3, parce qu'elle introduit une
error_callback
argument mot-clé à tous asynchronemultiprocessing.Pool
méthodes. Cela le rend beaucoup plus facile à intégrer avec Tornade:Tout ce que nous devons faire dans notre substituée
apply_async
est d'appeler le parent leerror_callback
argument mot-clé, en plus de lacallback
kwarg. Pas besoin de surchargerApplyResult
.Nous pouvons obtenir même éleveur en utilisant une Métaclasse dans notre
TornadoPool
, pour permettre à son*_async
méthodes à appeler directement comme s'ils étaient des coroutines:Si vos demandes sont à prendre qu'une tornade est le bon cadre.
Je vous suggère d'utiliser nginx pour acheminer le jeûne devient à la tornade et les plus lents, vers un autre serveur.
PeterBe a un article intéressant où il dirige plusieurs Tornade des serveurs et des jeux de l'un d'entre eux d'être "le lent" pour gérer les demandes en cours d'exécution, voir: inquiétant-sur-io-blocage je voudrais essayer cette méthode.