RabbitMQ C# pilote cesse de recevoir des messages
Avez-vous des pointeurs de la façon de déterminer quand un abonnement problème a eu lieu donc je ne peux reconnecter?
Mon service utilise RabbitMQ.Client.MessagePatterns.Abonnement pour un abonnement. Après un certain temps, mon client silencieusement cesse de recevoir des messages. Je soupçonne des problèmes de réseau que j'notre connexion VPN n'est pas le plus fiable.
J'ai lu dans les docs pour un certain temps à la recherche d'une clé pour savoir quand cet abonnement peut être rompu en raison d'un problème de réseau sans beaucoup de chance. J'ai essayé de vérifier que la connexion et le canal sont encore ouvertes, mais il semble toujours à signaler qu'il est toujours ouvert.
Les messages qu'il n'processus de travail assez bien et sont reconnus en arrière de la file d'attente, donc je ne pense pas que c'est un problème avec le "ack".
Je suis sûr que je dois être en manque juste quelque chose de simple, mais je n'ai pas encore trouvé.
public void Run(string brokerUri, Action<byte[]> handler)
{
log.Debug("Connecting to broker: {0}".Fill(brokerUri));
ConnectionFactory factory = new ConnectionFactory { Uri = brokerUri };
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(queueName, true, false, false, null);
using (Subscription subscription = new Subscription(channel, queueName, false))
{
while (!Cancelled)
{
BasicDeliverEventArgs args;
if (!channel.IsOpen)
{
log.Error("The channel is no longer open, but we are still trying to process messages.");
throw new InvalidOperationException("Channel is closed.");
}
else if (!connection.IsOpen)
{
log.Error("The connection is no longer open, but we are still trying to process message.");
throw new InvalidOperationException("Connection is closed.");
}
bool gotMessage = subscription.Next(250, out args);
if (gotMessage)
{
log.Debug("Received message");
try
{
handler(args.Body);
}
catch (Exception e)
{
log.Debug("Exception caught while processing message. Will be bubbled up.", e);
throw;
}
log.Debug("Acknowledging message completion");
subscription.Ack(args);
}
}
}
}
}
}
Mise à JOUR:
J'ai simulé une panne de réseau, par l'exécution de l'server dans une machine virtuelle et je ne obtenir une exception (RabbitMQ.Client.Des Exceptions.OperationInterruptedException: Le AMQP opération a été interrompue) quand je me casse la connexion assez longtemps, alors peut-être que ce n'est pas un problème de réseau. Maintenant, je ne sais pas ce qu'il serait, mais il ne parvient pas au bout de seulement quelques heures de cours.
Vous devez vous connecter pour publier un commentaire.
EDIT: Depuis que je suis rebord de la prise en upvotes sur ce, je tiens à souligner que l' .NET RabbitMQ client a maintenant cette fonctionnalité intégrée: https://www.rabbitmq.com/dotnet-api-guide.html#connection-recovery
Idéalement, vous devriez être en mesure de l'utiliser et d'éviter manuellement la mise en œuvre de la logique de reconnexion.
J'ai récemment eu à mettre en œuvre près de la même chose. À partir de ce que je peux dire, la plupart des informations disponibles sur RabbitMQ suppose que soit votre réseau est très fiable ou que vous exécutez une RabbitMQ courtier sur la même machine que n'importe quel client de l'envoi ou de la réception de messages, ce qui permet le Lapin à traiter de problèmes de connexion.
C'est vraiment pas dur à mettre en place le Lapin client pour être robuste contre les pertes de connexion, mais il y a quelques particularités que vous devez traiter.
La première chose que vous devez faire tourner sur le rythme cardiaque:
Réglage de la "RequestedHeartbeat" à 30 feront le client de vérifier toutes les 30 secondes si la connexion est toujours en vie. Sans cette allumé, le message, l'abonné devra s'asseoir là, heureusement en attente pour un autre message sans la moindre idée que sa connexion a mal tourné.
Tournant le battement de coeur le rend également le serveur de vérifier pour voir si la connexion est toujours en place, ce qui peut être très important. Si la connexion va mal après qu'un message a été ramassé par l'abonné, mais avant il a été reconnu, le serveur vient suppose que le client est de prendre beaucoup de temps, et le message est "coincé" sur la mort de la connexion jusqu'à ce qu'elle se ferme. Avec le rythme cardiaque activée, le serveur de reconnaître quand la connexion va mal et de le fermer, mettre le message dans la file d'attente pour un autre abonné peut s'en occuper. Sans le battement de cœur, j'ai dû passer manuellement et fermer la connexion chez le Lapin, la gestion de l'INTERFACE utilisateur, de sorte que le a collé le message peut obtenir passé à un abonné.
Deuxièmement, vous aurez besoin de gérer
OperationInterruptedException
. Comme vous l'avez remarqué, ce est généralement à l'exception du Lapin client jeter quand il détecte la connexion a été interrompue. SiIModel.QueueDeclare()
est appelée lorsque la connexion a été interrompue, c'est l'exception que vous obtiendrez. Gérer cette exception en vous débarrassant de votre abonnement, le canal, et de la connexion et de créer de nouvelles.Enfin, vous aurez à gérer ce que votre consommateur ne lorsque vous essayez de consommer des messages à partir d'une connexion fermée. Malheureusement, chacune de façon différente de consommer les messages d'une file d'attente chez le Lapin client semble réagir différemment.
QueueingBasicConsumer
jetteEndOfStreamException
si vous appelezQueueingBasicConsumer.Queue.Dequeue
sur une connexion fermée.EventingBasicConsumer
ne fait rien, puisque c'est juste en attente d'un message. De ce que je peux dire à partir de l'essayer, leSubscription
classe que vous soyez à l'aide d'semble renvoyer vrai à partir d'un appel àSubscription.Next
, mais la valeur deargs
est null. Encore une fois, gérer le présent en se débarrassant de votre connexion, le canal, et d'abonnement et de recréer eux.La valeur de
connection.IsOpen
sera mis à jour à False si la connexion échoue avec le rythme cardiaque, de sorte que vous pouvez vérifier si vous le souhaitez. Toutefois, étant donné le rythme cardiaque s'exécute sur un thread séparé, vous aurez toujours besoin de gérer le cas où la connexion est ouverte lorsque vous cochez la case, mais se ferme avantsubscription.Next()
est appelé.Une dernière chose à regarder dehors pour est
IConnection.Dispose()
. Cet appel va jeter unEndOfStreamException
si vous appelez dispose après que la connexion a été fermée. Cela semble être un bug pour moi, et je n'aime pas ne pas appeler dispose sur uneIDisposable
objet, je l'appelle et avaler l'exception.Mettre tous ensemble dans un rapide et sale exemple: