Attendez pour un seul RabbitMQ message avec un délai d'attente
Je voudrais envoyer un message à un serveur RabbitMQ et puis attendre un message de réponse (sur un "reply-to" de la file d'attente). Bien sûr, je ne veux pas attendre éternellement dans le cas où l'application de traitement de ces messages est vers le bas - il doit y avoir un délai d'attente. Il sonne comme une tâche de base, mais je ne peux pas trouver un moyen de le faire. Maintenant, j'ai rencontré ce problème avec les deux py-amqplib et la RabbitMQ .NET client.
La meilleure solution que j'ai obtenu jusqu'à présent est d'un sondage à l'aide de basic_get
avec sleep
entre-deux, mais c'est assez moche:
def _wait_for_message_with_timeout(channel, queue_name, timeout):
slept = 0
sleep_interval = 0.1
while slept < timeout:
reply = channel.basic_get(queue_name)
if reply is not None:
return reply
time.sleep(sleep_interval)
slept += sleep_interval
raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout)
Il y a sûrement quelque meilleure façon de faire?
Vous devez vous connecter pour publier un commentaire.
J'ai juste ajouté délai de soutien pour
amqplib
danscarrot
.C'est une sous-classe de
amqplib.client0_8.Connection
:http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97
wait_multi
est une version dechannel.wait
en mesure de recevoir sur un nombre arbitrairede canaux.
Je suppose que cela pourrait être fusionné en amont à un certain point.
Voici ce que j'ai fait dans la .NET client:
Malheureusement, je ne peux pas faire la même chose avec py-amqplib, parce que son
basic_consume
méthode ne permet pas d'appeler la fonction de rappel, à moins que vous appelezchannel.wait()
etchannel.wait()
ne supporte pas les délais d'attente! Ce stupide limitation (qui je continuer à courir dans) signifie que si vous ne recevez jamais un autre message de votre fil est figé à jamais.Il y a un exemple ici à l'aide de qpid avec un
msg = q.get(timeout=1)
qui devrait faire ce que vous voulez. Désolé, je ne sais pas ce que les autres AMQP bibliothèques client de mettre en œuvre les délais (et, en particulier, je ne connais pas les deux de celles que tu as mentionné).basic_consume
avec une file d'attente et d'attente dans la file d'attente avec un délai d'attente. Dirait que je vais avoir à faire.Cela semble casser l'idée d'un traitement asynchrone, mais si vous devez, je pense que la bonne façon de le faire est d'utiliser un RpcClient.
QueueingBasicConsumer
et d'attendre sur sa file d'attente, qui prend en charge un délai d'attente. Ce n'est pas aussi complexe .NET comme je le craignais.Lapin maintenant vous permet d'ajouter des événements de délai d'expiration. Enroulez simplement votre code dans un try catch et ensuite lancer des exceptions dans le Délai et de la Déconnexion des gestionnaires: