Pourquoi l'itération sur GetConsumingEnumerable () ne vide-t-elle pas complètement la collection de blocage sous-jacente?

J'ai un quantifiables & reproductible problème à l'aide de la Task Parallel Library, BlockingCollection<T>ConcurrentQueue<T> & GetConsumingEnumerable tout en essayant de créer un simple pipeline.

En un mot, l'ajout d'entrées à un défaut BlockingCollection<T> (qui, sous le capot est en s'appuyant sur un ConcurrentQueue<T>) à partir d'un fil, ne garantit pas qu'ils seront sauté le BlockingCollection<T> à partir d'un autre thread appelle la GetConsumingEnumerable() Méthode.

J'ai créé un très simple Winforms Application à reproduire/simuler ce qui imprime juste entiers à l'écran.

  • Timer1 est responsable de la mise en attente jusqu'les éléments de travail... Il utilise un simultanées dictionnaire appelé _tracker de sorte qu'il sait ce qu'il a déjà ajouté le blocage de la collection.
  • Timer2 est juste l'enregistrement de l'compter du BlockingCollection & de la _tracker
  • Le bouton DÉMARRER coup d'envoi à un Paralell.ForEach qui, tout simplement, parcourt le blocage des collections GetConsumingEnumerable() et commence à imprimer à la deuxième zone de liste.
  • La touche STOP arrête Timer1 permet d'éviter les entrées d'être ajouté à l'blocage de la collection.
public partial class Form1 : Form
{
private int Counter = 0;
private BlockingCollection<int> _entries;
private ConcurrentDictionary<int, int> _tracker;
private CancellationTokenSource _tokenSource;
private TaskFactory _factory;
public Form1()
{
_entries = new BlockingCollection<int>();
_tracker = new ConcurrentDictionary<int, int>();
_tokenSource = new CancellationTokenSource();
_factory = new TaskFactory(); 
InitializeComponent();
}
private void timer1_Tick(object sender, EventArgs e)
{ //ADDING TIMER -> LISTBOX 1
for(var i = 0; i < 3; i++,Counter++)
{
if (_tracker.TryAdd(Counter, Counter))
_entries.Add(Counter);
listBox1.Items.Add(string.Format("Adding {0}", Counter));
}
}
private void timer2_Tick_1(object sender, EventArgs e)
{ //LOGGING TIMER -> LIST BOX 3
listBox3.Items.Add(string.Format("Tracker Count : {0} /Entries Count : {1}", _tracker.Count, _entries.Count));
}
private void button1_Click(object sender, EventArgs e)
{ //START BUTTON -> LOGS TO LIST BOX 2
var options = new ParallelOptions {
CancellationToken = _tokenSource.Token,
MaxDegreeOfParallelism = 1
};
_factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });
timer1.Enabled = timer2.Enabled = true;
timer1.Start();
timer2.Start();
}
private void DoWork(int entry)
{
Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
int oldEntry;
_tracker.TryRemove(entry, out oldEntry);
}
private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
timer1.Stop();
timer1.Enabled = false;
}

Voici la séquence des événements:

  • Appuyez Sur Start
  • Timer1 tiques & ListBox1 est immédiatement mis à jour avec de 3 messages (Ajout de 0, 1, 2)
  • ListBox2 est mis à jour avec 3 messages, 1 seconde d'intervalle
    • Traitement 0
    • Traitement 1
    • De traitement de 2
  • Timer1 tiques & ListBox1 est immédiatement mis à jour avec de 3 messages (Ajout de 3, 4, 5)
  • ListBox2 est sbsequent mis à jour avec les 2 messages, 1 seconde d'intervalle
    • De traitement de 3
    • De traitement de 4
    • De traitement de 5 n'est pas imprimé... semble avoir "disparu"
  • Appuyez sur STOP pour empêcher plus de messages ajoutés par minuterie 1
  • Attendre... "Traitement de l'5" n'apparaît toujours pas

Pourquoi l'itération sur GetConsumingEnumerable () ne vide-t-elle pas complètement la collection de blocage sous-jacente?

Vous pouvez voir que la concurrente dictionnaire est toujours suivi 1 élément n'a pas encore été traitée, & par la suite retirés de _tracker

Si j'Appuyez de nouveau sur Start, puis timer1 commence par l'ajout de plus de 3 de plus et les entrées de la boucle Parallèle revient à la vie d'impression 5, 6, 7 & 8.

Pourquoi l'itération sur GetConsumingEnumerable () ne vide-t-elle pas complètement la collection de blocage sous-jacente?

Je suis à une perte complète de pourquoi cela se produit. L'appel de recommencer évidemment appelle une newtask, qui appelle un Paralèlle foreach, et re-exécute GetConsumingEnumerable() qui comme par magie trouve l'entrée manquante... je

Pourquoi le BlockingCollection.GetConsumingEnumerable() ne garantissant pas à itérer sur chaque élément qui est ajouté à la collection.

Pourquoi l'ajout de plus d'entrées de provoquer par la suite elle à se "décoller" et continuer avec le traitement?

source d'informationauteur Eoin Campbell