Pourquoi l'itération sur GetConsumingEnumerable () ne vide-t-elle pas complètement la collection de blocage sous-jacente?
J'ai un quantifiables & reproductible problème à l'aide de la Task Parallel Library, BlockingCollection<T>
ConcurrentQueue<T>
& GetConsumingEnumerable
tout en essayant de créer un simple pipeline.
En un mot, l'ajout d'entrées à un défaut BlockingCollection<T>
(qui, sous le capot est en s'appuyant sur un ConcurrentQueue<T>
) à partir d'un fil, ne garantit pas qu'ils seront sauté le BlockingCollection<T>
à partir d'un autre thread appelle la GetConsumingEnumerable()
Méthode.
J'ai créé un très simple Winforms Application à reproduire/simuler ce qui imprime juste entiers à l'écran.
Timer1
est responsable de la mise en attente jusqu'les éléments de travail... Il utilise un simultanées dictionnaire appelé_tracker
de sorte qu'il sait ce qu'il a déjà ajouté le blocage de la collection.Timer2
est juste l'enregistrement de l'compter duBlockingCollection
& de la_tracker
- Le bouton DÉMARRER coup d'envoi à un
Paralell.ForEach
qui, tout simplement, parcourt le blocage des collectionsGetConsumingEnumerable()
et commence à imprimer à la deuxième zone de liste. - La touche STOP arrête
Timer1
permet d'éviter les entrées d'être ajouté à l'blocage de la collection.
public partial class Form1 : Form
{
private int Counter = 0;
private BlockingCollection<int> _entries;
private ConcurrentDictionary<int, int> _tracker;
private CancellationTokenSource _tokenSource;
private TaskFactory _factory;
public Form1()
{
_entries = new BlockingCollection<int>();
_tracker = new ConcurrentDictionary<int, int>();
_tokenSource = new CancellationTokenSource();
_factory = new TaskFactory();
InitializeComponent();
}
private void timer1_Tick(object sender, EventArgs e)
{ //ADDING TIMER -> LISTBOX 1
for(var i = 0; i < 3; i++,Counter++)
{
if (_tracker.TryAdd(Counter, Counter))
_entries.Add(Counter);
listBox1.Items.Add(string.Format("Adding {0}", Counter));
}
}
private void timer2_Tick_1(object sender, EventArgs e)
{ //LOGGING TIMER -> LIST BOX 3
listBox3.Items.Add(string.Format("Tracker Count : {0} /Entries Count : {1}", _tracker.Count, _entries.Count));
}
private void button1_Click(object sender, EventArgs e)
{ //START BUTTON -> LOGS TO LIST BOX 2
var options = new ParallelOptions {
CancellationToken = _tokenSource.Token,
MaxDegreeOfParallelism = 1
};
_factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });
timer1.Enabled = timer2.Enabled = true;
timer1.Start();
timer2.Start();
}
private void DoWork(int entry)
{
Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
int oldEntry;
_tracker.TryRemove(entry, out oldEntry);
}
private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
timer1.Stop();
timer1.Enabled = false;
}
Voici la séquence des événements:
- Appuyez Sur Start
- Timer1 tiques & ListBox1 est immédiatement mis à jour avec de 3 messages (Ajout de 0, 1, 2)
- ListBox2 est mis à jour avec 3 messages, 1 seconde d'intervalle
- Traitement 0
- Traitement 1
- De traitement de 2
- Timer1 tiques & ListBox1 est immédiatement mis à jour avec de 3 messages (Ajout de 3, 4, 5)
- ListBox2 est sbsequent mis à jour avec les 2 messages, 1 seconde d'intervalle
- De traitement de 3
- De traitement de 4
- De traitement de 5 n'est pas imprimé... semble avoir "disparu"
- Appuyez sur STOP pour empêcher plus de messages ajoutés par minuterie 1
- Attendre... "Traitement de l'5" n'apparaît toujours pas
Vous pouvez voir que la concurrente dictionnaire est toujours suivi 1 élément n'a pas encore été traitée, & par la suite retirés de _tracker
Si j'Appuyez de nouveau sur Start, puis timer1 commence par l'ajout de plus de 3 de plus et les entrées de la boucle Parallèle revient à la vie d'impression 5, 6, 7 & 8.
Je suis à une perte complète de pourquoi cela se produit. L'appel de recommencer évidemment appelle une newtask, qui appelle un Paralèlle foreach, et re-exécute GetConsumingEnumerable() qui comme par magie trouve l'entrée manquante... je
Pourquoi le BlockingCollection.GetConsumingEnumerable()
ne garantissant pas à itérer sur chaque élément qui est ajouté à la collection.
Pourquoi l'ajout de plus d'entrées de provoquer par la suite elle à se "décoller" et continuer avec le traitement?
source d'informationauteur Eoin Campbell
Vous devez vous connecter pour publier un commentaire.
Vous ne pouvez pas utiliser
GetConsumingEnumerable()
dansParallel.ForEach()
.Utiliser le
GetConsumingPartitioner
de la TPL extrasDans le billet de blog vous permettra aussi d'obtenir une explication, pourquoi ne pouvez pas utiliser
GetConsumingEnumerable()
c'est à dire Parallèle.ForEach attendre jusqu'à ce qu'il reçoit d'un groupe d'éléments de travail avant de continuer. Exactement ce que votre expérience de montre.
Comme de .net 4.5, vous pouvez créer un programme de partitionnement qui va prendre 1 seul élément à la fois:
https://msdn.microsoft.com/en-us/library/system.collections.concurrent.enumerablepartitioneroptions(v=vs. 110).aspx
Je n'arrivais pas à reproduire votre comportement avec console simple demande de faire la même chose (en cours d'exécution sur .Net 4.5 beta, ce qui pourrait faire une différence). Mais je pense que la raison à cela est que
Parallel.ForEach()
essaie d'optimiser l'exécution par le fractionnement de la collection d'entrée en morceaux. Et avec votre énumérable, un morceau ne peut pas être créé jusqu'à ce que vous ajoutez des éléments à la collection. Pour plus d'informations, voir Personnalisé Partitioners pour PLINQ et TPL sur le site MSDN.Pour résoudre ce problème, n'utilisez pas
Parallel.ForEach()
. Si vous voulez continuer à traiter les éléments en parallèle, vous pouvez démarrer uneTask
à chaque itération.Je sens que je note juste pour plus de clarté que dans le cas où vous êtes en mesure d'appeler le BlockingCollection .CompleteAdding() la méthode avant l'exécution de la Parallèle.foreach, le problème que vous décrivez ci-dessus ne sera pas un problème. J'ai utilisé ces deux objets ensemble à de nombreuses reprises avec d'excellents résultats.
En outre, vous pouvez toujours re-configurer votre BlockingCollection après l'appel de CompleteAdding() pour ajouter d'autres éléments, en cas de besoin (_entries = new BlockingCollection();)
Changer le code d'événement click ci-dessus comme suit permettrait de résoudre votre problème avec l'entrée manquante et de le faire fonctionner comme prévu, si vous cliquez sur démarrer et d'arrêter les boutons plusieurs fois: