awaitable Tâche de la file d'attente
Je me demandais si il existe une mise en œuvre/wrapper pour ConcurrentQueue, semblable à BlockingCollection où la prise de la collection ne se bloque pas, mais est plutôt asynchrone et va provoquer une async attendent jusqu'à ce qu'un élément est placé dans la file d'attente.
Je suis venu avec ma propre mise en œuvre, mais il ne semble pas fonctionner comme prévu. Je me demande si je suis réinventer quelque chose qui existe déjà.
Voici mon oeuvre:
public class MessageQueue<T>
{
ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
ConcurrentQueue<TaskCompletionSource<T>> waitingQueue =
new ConcurrentQueue<TaskCompletionSource<T>>();
object queueSyncLock = new object();
public void Enqueue(T item)
{
queue.Enqueue(item);
ProcessQueues();
}
public async Task<T> Dequeue()
{
TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
waitingQueue.Enqueue(tcs);
ProcessQueues();
return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
}
private void ProcessQueues()
{
TaskCompletionSource<T> tcs=null;
T firstItem=default(T);
while (true)
{
bool ok;
lock (queueSyncLock)
{
ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
if (ok)
{
waitingQueue.TryDequeue(out tcs);
queue.TryDequeue(out firstItem);
}
}
if (!ok) break;
tcs.SetResult(firstItem);
}
}
}
- Selon notre thème conseils, "Quelques questions sont toujours hors-sujet, même si elles s'inscrivent dans l'une des catégories énumérées ci-dessus:...Questions nous demandant de recommander ou trouver un livre, un outil, une bibliothèque de logiciels, tutoriel ou d'autres ressources de site sont hors-sujet..."
- Une attendent-mesure de la file d'attente est ce que j'ai pensé récemment encore (voici ma question: stackoverflow.com/questions/52775484/...)! Il permettrait de résoudre BEAUCOUP de problèmes dans un microservices architecture, je crois! Mais dans ce cas, la file d'attente devrait probablement être une file d'attente persistante et non pas quelque chose en mémoire.
- Connexes: Est-il quelque chose comme asynchrone BlockingCollection<T>?
Vous devez vous connecter pour publier un commentaire.
Je ne sais pas de lock-solution gratuite, mais vous pouvez prendre un coup d'oeil à la nouvelle Bibliothèque de flux de données, une partie de la Async CTP. Un simple
BufferBlock<T>
devrait suffire, par exemple:De Production et de consommation sont plus facilement mises en œuvre via des méthodes d'extension sur le flux de données de types de bloc.
De Production est aussi simple que:
et de la consommation est asynchrone de prêt:
Je ne vous recommandons l'utilisation de Flux de données si possible; la fabrication d'un tel tampon à la fois efficace et correcte est plus difficile qu'il n'y paraît au premier abord.
Approche Simple avec C# 8.0
IAsyncEnumerable
et Bibliothèque de flux de donnéesAvec une mise en œuvre de
AsyncQueue
comme suit:IAsyncEnumerable
, mais je suis très familier avec javascriptSymbol.asyncIterator
qui ressemble plus ou moins le même concept.IEnumerable
, mais vous pouvez asynchrone attendent de nouveaux éléments, donc c'est un non-opération de blocage.Mon atempt (il a un évènement soulevée lors de la "promesse" est créé, et il peut être utilisé par un producteur externe pour savoir quand produire plus d'articles):
CancellationTokenRegistration
vous avez obtenu à partir de lacancellationToken.Register
appel.Il est peut-être exagéré pour votre cas d'utilisation (compte tenu de la courbe d'apprentissage), mais Réactif Extentions fournit toute la colle que vous pourriez jamais vouloir pour asynchrones composition.
Vous abonner à des changements, et ils sont poussés à vous qu'ils sont disponibles, et vous pouvez avoir le système de pousser les modifications sur un thread séparé.
Découvrez https://github.com/somdoron/AsyncCollection, vous pouvez retirer de façon asynchrone et C# 8.0 IAsyncEnumerable.
L'API est très similaire à BlockingCollection.
Avec IAsyncEnumeable:
Voici la mise en œuvre, je suis en train de l'utiliser.
Il fonctionne assez bon, mais il y a beaucoup de controverse sur
queueSyncLock
, comme je fais beaucoup de l'utilisation de laCancellationToken
pour annuler certaines des tâches en attente. Bien sûr, cela conduit à beaucoup moins de blocage, je voudrais le voir avec unBlockingCollection
mais...Je me demandais si il y a une plus lisse, lock gratuit moyen de parvenir à la même fin
Vous pouvez simplement utiliser un
BlockingCollection
( à l'aide de la valeur par défautConcurrentQueue
) et l'enveloppe de l'appel àTake
dans unTask
de sorte que vous pouvezawait
c':