dimanche 21 novembre 2010

Boost asio - Queues de messages

La bibliothèque Asio, intégrée à Boost, permet de gérer des événements de manière asynchrone. Principalement utilisée pour le réseau, elle peut également être très utile pour résoudre des problèmes de programmation concurrente. Un exemple très simple: la gestion de queues de messages.

Prenons donc une classe qui affiche des messages sur la console. Une manière de gérer l'accès concurrent est de mettre un bon vieux mutex autour de l'appel à std::cout, forçant chaque thread à attendre sur des opérations d'entrées sorties potentiellement longues. Une autre manière, souvent plus efficace, est d'ajouter le message sur une queue, laquelle est lue par un thread unique chargé de l'affichage. Comment faire de même avec Asio?


class Logger
{
public:
Logger():
m_work(m_service),
m_thread(boost::bind(&boost::asio::io_service::run,
boost::ref(m_service)))
{
}

~Logger()
{
m_service.stop();
m_thread.join();
}

void log(const std::string & message)
{
m_service.dispatch(boost::bind(&Logger::doLog, this, message));
}

private:
void doLog(const std::string & message)
{
std::cout << message << std::endl;
}

boost::asio::io_service m_service;
boost::asio::io_service::work m_work;
boost::thread m_thread;
};

Cette classe s'utilise simplement en instanciant l'objet Logger, puis en appelant la méthode log. L'appel à log est thread safe, et peut donc être appelé par n'importe quel thread de l'application. L'affichage, lui, ne tourne que depuis le thread interne de la classe. Pas de mutex, pas de conditions, tout est planqué dans Asio.

Remarquez l'objet work, qui indique à m_service qu'il y a toujours quelque chose à faire, pour éviter que m_service.run() ne retourne prématurément.

Et pour l'opération inverse, c'est à dire l'utilisation d'une queue de messages pour faire tourner des calculs potentiellement lourds simultanément? C'est à peine plus complexe, comme par exemple dans cette classe qui calcule le n-ième nombre premier.


class Prime
{
public:
Prime():
m_work(m_service)
{
m_threadGroup.create_thread(boost::bind(&boost::asio::io_service::run,
boost::ref(m_service)));
m_threadGroup.create_thread(boost::bind(&boost::asio::io_service::run,
boost::ref(m_service)));
m_threadGroup.create_thread(boost::bind(&boost::asio::io_service::run,
boost::ref(m_service)));
m_threadGroup.create_thread(boost::bind(&boost::asio::io_service::run,
boost::ref(m_service)));
}

~Prime()
{
m_service.stop();
m_threadGroup.join_all();
}

void nthPrime(size_t n)
{
m_service.dispatch(boost::bind(&Prime::doNthPrime, this, n));
}

private:
void doNthPrime(size_t n)
{
std::vector primes(1, 2);
size_t current = 2;
while(primes.size() < n)
{
current++;
size_t i;
for(i = 0; i < primes.size(); ++i)
{
if(current % primes.at(i) == 0)
break;
}
if(i >= primes.size())
{
primes.push_back(current);
}
}
std::cout << primes.at(n - 1);
}

boost::asio::io_service m_service;
boost::asio::io_service::work m_work;
boost::thread_group m_threadGroup;
};

Faisant tourner 4 threads à partir du bon vieux thread pool de chez Boost, cette classe va empiler les requêtes et n'en faire tourner que 4 maximum à la fois.

L'étape suivant serait d'appeler Logger depuis Prime, afin de n'afficher le résultat que depuis un seul thread. L'on peut ainsi construire un programme à partir de briques de base qui gèrent elles-mêmes l'accès concurrent, sans avoir à se (trop) se préoccuper de l'accès concurrent ou d'interbloquages.

Cette approche est cependant limitée par l'explosion du nombre de threads dans l'application, chaque brique gérant un certain nombre de threads indépendants. Lorsque la charge de travail est lourde, l'on risque de se retrouver avec beaucoup plus de threads tentant de s'exécuter que de processeurs.

L'on peut alors modifier quelque peu son approche, pour partager un même service asio au sein du programme, et laisser à la bibliothèque le soin d'ordonnancer les tâches.

S'assurer que l'accès à certaines ressources demeure séquentiel, comme pour notre Logger, peut se faire grâce aux strands. Démonstration dans un prochain post.

Aucun commentaire: