L'arrêt de C++ 11 std::threads en attente sur un std::condition_variable

J'essaie de comprendre la base de multithreading les mécanismes du C++ 11. L'exemple le plus simple je pense est le suivant:

  • Un producteur et un consommateur, sont mis en œuvre dans des threads séparés
  • Le producteur des lieux un certain nombre d'éléments à l'intérieur d'une file d'attente
  • Le consommateur prend les éléments de la file d'attente si il y a aucun présent

Cet exemple est également utilisé dans de nombreux livres d'école sur le multithreading et tout sur le processus de communication fonctionne très bien. Cependant, j'ai un problème quand il s'agit de l'arrêt du thread consommateur.

Je veux le consommateur à exécuter jusqu'à ce qu'il obtient explicitement un signal d'arrêt (dans la plupart des cas, cela signifie que je l'attends pour le producteur pour finir donc, je peux arrêter le consommateur avant que le programme est terminé). Malheureusement, C++ 11 fils de l'absence d'un mécanisme d'interruption (qui, je le sais depuis le multithreading en Java par exemple). Donc, je dois utiliser des drapeaux comme isRunning pour signaler que je veux un thread de s'arrêter.

Le principal problème aujourd'hui est: Après j'ai arrêté le producteur fil, la file d'attente est vide et que le consommateur est en attente sur une condition_variable pour obtenir un signal lorsque la file d'attente est remplie de nouveau. J'ai donc besoin de réveiller le thread en appelant notify_all() sur la variable avant de quitter.

J'ai trouvé une solution qui fonctionne, mais il semble quelque peu chaotique.
L'exemple de code est indiqué ci-dessous (je suis désolé, mais de toute façon je ne pouvais pas réduire la taille du code tout plus pour un "minimum" exemple minimal):

La classe de File d'attente:

class Queue{
public:
    Queue() : m_isProgramStopped{ false } { }

    void push(int i){
        std::unique_lock<std::mutex> lock(m_mtx);
        m_q.push(i);
        m_cond.notify_one();
    }

    int pop(){
        std::unique_lock<std::mutex> lock(m_mtx);
        m_cond.wait(lock, [&](){ return !m_q.empty() || m_isProgramStopped; });

        if (m_isProgramStopped){
            throw std::exception("Program stopped!");
        }

        int x = m_q.front();
        m_q.pop();

        std::cout << "Thread " << std::this_thread::get_id() << " popped " << x << "." << std::endl;
        return x;
    }

    void stop(){
        m_isProgramStopped = true;
        m_cond.notify_all();
    }

private:
    std::queue<int> m_q;
    std::mutex m_mtx;
    std::condition_variable m_cond;
    bool m_isProgramStopped;
};

Le Producteur:

class Producer{
public:
    Producer(Queue & q) : m_q{ q }, m_counter{ 1 } { }

    void produce(){
        for (int i = 0; i < 5; i++){
            m_q.push(m_counter++);
            std::this_thread::sleep_for(std::chrono::milliseconds{ 500 });
        }
    }

    void execute(){
        m_t = std::thread(&Producer::produce, this);
    }

    void join(){
        m_t.join();
    }

private:
    Queue & m_q;
    std::thread m_t;

    unsigned int m_counter;
};

Le Consommateur:

class Consumer{
public:
    Consumer(Queue & q) : m_q{ q }, m_takeCounter{ 0 }, m_isRunning{ true }
    { }

    ~Consumer(){
        std::cout << "KILL CONSUMER! - TOOK: " << m_takeCounter << "." << std::endl;
    }

    void consume(){
        while (m_isRunning){
            try{
                m_q.pop();
                m_takeCounter++;
            }
            catch (std::exception e){
                std::cout << "Program was stopped while waiting." << std::endl;
            }
        }
    }

    void execute(){
        m_t = std::thread(&Consumer::consume, this);
    }

    void join(){
        m_t.join();
    }

    void stop(){
        m_isRunning = false;
    }

private:
    Queue & m_q;
    std::thread m_t;

    unsigned int m_takeCounter;
    bool m_isRunning;
};

Et enfin la main():

int main(void){
    Queue q;

    Consumer cons{ q };
    Producer prod{ q };

    cons.execute();
    prod.execute();

    prod.join();

    cons.stop();
    q.stop();

    cons.join();

    std::cout << "END" << std::endl;

    return EXIT_SUCCESS;
}

Est-ce la droit façon de terminer un thread qui attend une variable de condition ou il y a de meilleures méthodes? Actuellement, la file d'attente a besoin de savoir si le programme est arrêté (qui à mon avis détruit le couplage des composants) et j'ai besoin d'appeler stop() sur la file d'attente explicitement ce qui ne semble pas juste.

De plus, la variable de condition qui doit seulement être utilisé comme un singal si la file d'attente est vide maintenant représente une autre condition - si la fin du programme. Si je ne me trompe pas, chaque fois qu'un thread attend une variable de condition pour un événement se produise, il doit également vérifier si le fil doit être arrêté avant de poursuivre son exécution (ce qui semble également mal).

Puis-je avoir ces problèmes parce que toute ma conception est défectueux ou alors j'ai loupé certains mécanismes qui peuvent être utilisés à la sortie de threads de manière propre?

  • C'est à peu près ce que nous faisons dans notre code. Définir un "arrêt" de la variable et de l'informer de la variable de condition, et de tester le drapeau que la première chose qu'il fait. Il ne semble pas être une chose facile, plus élégant générale de la solution que nous avons trouvé.
  • Vous pouvez appeler stop() de le destructeur de votre file d'attente. Voir une solution similaire stackoverflow.com/a/9711916/412080
  • Si vous voulez juste pour mettre en œuvre un "Producteur/Consommateur", puis il y a d'autres approches de résoudre ce problème sans la nécessité de trouver une solution pour le méchant problème de la "invalider" ou "annuler" un les primitives de synchronisation, afin de reprendre un fil.
  • Vous pouvez également utiliser une Sentinelle (c2.com/cgi/wiki?SentinelPattern)
  • Les deux Queue::m_isProgramStopped et Consumer::m_isRunning devrait être atomic<bool> ou atomic_flag parce que main() écrit deux d'entre eux tandis que pas protégée par un mutex lock.
  • Je pense que la variable de condition de notifier et d'attendre fournir une barrière de mémoire rendant inutiles. Peut-être. Ne pas me citer sur que si...
  • Faire Queue::m_isProgramStopped atomique rendra les données de la course libre, mais permet toujours la course entre le notify dans stop et wait dans pop. Il est possible pour un thread dans stop à notify après un thread dans pop a vérifié m_isProgramStopped mais avant il a dormi sur la variable de condition, résultant dans la notification de perdu et un thread principal qui attend jamais pour un consommateur qui ne sera plus jamais quitter. La meilleure solution est de surveiller tous les accès aux m_isProgramStopped avec m_mtx.
  • Ce que Casey a dit. Avisant conditions doivent être verrouillé par la même mutex sur qui il attend quelque part ailleurs dans le code. J'ai fait la même erreur before. Essentiellement, la première ligne de File d'attente::arrêt doit être verrouillage m_mtx.
  • Sur le style: si vous ne vous attendez pas à appeler execute et stop/join plusieurs fois, mettre le corps de ces fonctions dans les constructeurs et les destructeurs, respectivement. Comme il est, il est un peu C-ish.