La meilleure façon d'effectuer le traitement multiple dans les demandes avec le python Tornade serveur?

Je suis en utilisant des e/S non bloquantes python serveur Tornade. J'ai une classe de GET les demandes qui peuvent prendre beaucoup de temps pour le terminer (pense que dans la gamme de 5 à 10 secondes). Le problème est que la Tornade blocs sur ces demandes suivantes rapide des demandes sont maintenus en place jusqu'à ce que la lenteur de la demande complète.

J'ai regardé: https://github.com/facebook/tornado/wiki/Threading-and-concurrency et est venu à la conclusion que je voulais une combinaison de #3 (d'autres processus) et #4 (les autres threads). #4 sur son propre eu des problèmes et j'ai été incapable d'obtenir un contrôle fiable de retour à la ioloop quand il y avait un autre thread à faire le "heavy_lifting". (Je suppose que c'était dû à la GIL et le fait que le heavy_lifting tâche a charge élevée de l'UC et continue tirant de contrôle à distance de la principale ioloop, mais c'est une supposition).

J'ai donc été prototypage comment résoudre ce problème en faisant des "gros travaux" tâches au sein de ces lentes GET demandes dans un processus séparé et un rappel de retour dans la Tornade ioloop lorsque le processus est terminé pour terminer la demande. Cela libère de la ioloop pour traiter d'autres demandes.

J'ai créé un exemple simple illustrant une solution possible, mais je suis curieux d'avoir des retours de la communauté sur elle.

Ma question est double: Comment cela peut-il l'approche actuelle d'être simplifié? Quels sont les pièges potentiellement exister avec elle?

L'Approche

  1. Utiliser Tornade builtin asynchronous décorateur qui permet à une demande de rester ouvert et de la ioloop pour continuer.

  2. Frayer un processus distinct pour les "gros travaux" tâches à l'aide de python multiprocessing module. J'ai d'abord tenté d'utiliser le threading module, mais n'a pas pu obtenir toute la fiabilité de renoncer à de contrôle le ioloop. Il apparaît également que mutliprocessing permettront de profiter des multicores.

  3. Commencer un "observateur" fil dans les principales ioloop processus à l'aide de la threading module dont le travail est de regarder un multiprocessing.Queue pour les résultats du "gros œuvre" de la tâche quand il se termine. Cela était nécessaire parce que j'avais besoin de savoir que la heavy_lifting tâche avait terminé tout en étant en mesure de toujours aviser le ioloop que cette demande est maintenant terminé.

  4. Assurez-vous que le "guetteur" fil abandonne le contrôle de la principale ioloop boucle souvent avec time.sleep(0) appels afin que d'autres demandes continuent d'obtenir facilement traitées.

  5. Quand il y a un résultat dans la file d'attente puis ajouter un rappel de la "watcher" thread à l'aide de tornado.ioloop.IOLoop.instance().add_callback() qui est documenté pour être le seul moyen sûr d'appel ioloop cas des autres threads.

  6. Assurez-vous d'appeler ensuite finish() dans le rappel pour remplir la demande et à la main sur une réponse.

Ci-dessous est un exemple de code montrant cette approche. multi_tornado.py est le serveur de mise en œuvre de ce qui précède, et call_multi.py est un exemple de script qui appelle le serveur de deux façons différentes pour tester le serveur. Les deux tests d'appeler le serveur avec 3 lent GET demandes de suivi de 20 rapide GET demandes. Les résultats sont présentés pour les deux cours d'exécution avec et sans le threading.

Dans le cas de l'exécution avec "pas de filetage" les 3 demandes lentes (bloc de chaque de prendre un peu plus d'une seconde pour terminer). Quelques-uns des 20 rapide des demandes faufiler entre certains de la lenteur de la demande dans le ioloop (pas totalement sûr de comment cela se produit - mais il pourrait être un artefact que je suis en cours d'exécution à la fois le serveur et le client script de test sur la même machine). Le point ici est que tous les rapide, les demandes sont à des degrés divers.

Dans le cas de l'exécution avec threading activé le 20 rapide des requêtes de la première immédiatement et les trois demandes lentes complète à peu près au même moment par la suite qu'ils ont été exécutés en parallèle. C'est le comportement souhaité. Les trois lent demandes de 2,5 secondes pour terminer en parallèle, alors que dans le non filetée cas, les trois demandes lentes prendre environ 3,5 secondes au total. Donc, il y a environ 35% de la vitesse maximum de l'ensemble (je suppose en raison de multicœur de partage). Mais le plus important - de la rapide des demandes ont été immédiatement traitées en leu de les lentes.

Je n'ai pas beaucoup d'expérience avec la programmation multithread - alors que cette apparence travaille ici, je suis curieux de savoir:

Est-il un moyen plus simple pour accomplir cette? Ce monstre peuvent se cacher à l'intérieur de cette approche?

(Note: Un avenir compromis peut être de simplement exécuter plusieurs instances de Tornade avec un reverse proxy comme nginx faire de l'équilibrage de la charge. Peu importe ce que je vais être l'exécution de plusieurs instances avec un équilibreur de charge - mais je suis inquiet au sujet de simplement jeter le matériel à ce problème, puisqu'il semble que le matériel est directement couplé au problème en termes de blocage.)

Exemple De Code

multi_tornado.py (serveur exemple):

import time
import threading
import multiprocessing
import math
from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop
# run in some other process - put result in q
def heavy_lifting(q):
t0 = time.time()
for k in range(2000):
math.factorial(k)
t = time.time()
q.put(t - t0)  # report time to compute in queue
class FastHandler(RequestHandler):
def get(self):
res = 'fast result ' + self.get_argument('id')
print res
self.write(res)
self.flush()
class MultiThreadedHandler(RequestHandler):
# Note:  This handler can be called with threaded = True or False
def initialize(self, threaded=True):
self._threaded = threaded
self._q = multiprocessing.Queue()
def start_process(self, worker, callback):
# method to start process and watcher thread
self._callback = callback
if self._threaded:
# launch process
multiprocessing.Process(target=worker, args=(self._q,)).start()
# start watching for process to finish
threading.Thread(target=self._watcher).start()
else:
# threaded = False just call directly and block
worker(self._q)
self._watcher()
def _watcher(self):
# watches the queue for process result
while self._q.empty():
time.sleep(0)  # relinquish control if not ready
# put callback back into the ioloop so we can finish request
response = self._q.get(False)
IOLoop.instance().add_callback(lambda: self._callback(response))
class SlowHandler(MultiThreadedHandler):
@asynchronous
def get(self):
# start a thread to watch for
self.start_process(heavy_lifting, self._on_response)
def _on_response(self, delta):
_id = self.get_argument('id')
res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
print res
self.write(res)
self.flush()
self.finish()   # be sure to finish request
application = Application([
(r"/fast", FastHandler),
(r"/slow", SlowHandler, dict(threaded=False)),
(r"/slow_threaded", SlowHandler, dict(threaded=True)),
])
if __name__ == "__main__":
application.listen(8888)
IOLoop.instance().start()

call_multi.py (client testeur):

import sys
from tornado.ioloop import IOLoop
from tornado import httpclient
def run(slow):
def show_response(res):
print res.body
# make 3 "slow" requests on server
requests = []
for k in xrange(3):
uri = 'http://localhost:8888/{}?id={}'
requests.append(uri.format(slow, str(k + 1)))
# followed by 20 "fast" requests
for k in xrange(20):
uri = 'http://localhost:8888/fast?id={}'
requests.append(uri.format(k + 1))
# show results as they return
http_client = httpclient.AsyncHTTPClient()
print 'Scheduling Get Requests:'
print '------------------------'
for req in requests:
print req
http_client.fetch(req, show_response)
# execute requests on server
print '\nStart sending requests....'
IOLoop.instance().start()
if __name__ == '__main__':
scenario = sys.argv[1]
if scenario == 'slow' or scenario == 'slow_threaded':
run(scenario)

Résultats De Test

En exécutant python call_multi.py slow (le comportement de blocage):

Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20

En exécutant python call_multi.py slow_threaded (le comportement désiré):

Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s
  • Recommandation: attention pour le mur de texte.
  • OK. Des Suggestions? N'est-ce pas clair pour moi de communiquer tous les détails de ce qui se passe beaucoup plus concise.
  • Elle est généralement préférable de poser des questions comme celle-ci dans plusieurs petites. Mais, j'ai peut-être tort. Alors... est-ce votre seule question, comment simplifier cela? J'en ai mis que dans le haut - plus intéressant.
  • Je suis à la recherche de la simplification ou de l'autre approche. J'ai édité la question légèrement afin de mettre un peu plus de ce que je suis à la recherche d'avant.
  • Il me semble que vous pouvez simplifier ce à l'aide d'une file d'attente de demandes qui alimente un processus de piscine tels que celui trouvé dans le module multiprocessing. Voir docs.python.org/2/library/... pour plus d'info.
  • curieux, comment avez-vous résoudre ce problème? J'ai le même problème 🙂 Merci!
  • J'ai été très occupé avec d'autres aspects du projet - de sorte que je n'ai pas fait plus encore - mais le plan sur l'exécution ci-dessus MultithreadedHandler. J'ai vraiment pas vu d'autres commentaires sur d'autres approches qui utilisent la Tornade.

InformationsquelleAutor Rocketman | 2013-03-13