dimanche 21 novembre 2010

Boost asio - Queues de messages - Un exemple complet

Voilà un exemple complet avec un service unique, et des queues de messages parallélisables (pour Prime) et séquentielles (pour Logger). Dans l'objet Logger, plutôt que de maintenir une référence sur le service, on construit un objet strand, lequel garantit que tous les appels passés par lui seront séquentiels.

Notez la création et la destruction de l'objet m_work. Lorsqu'il est détruit, il assure que les appels à io_service::run vont retourner une fois que tous les messages auront été traités, ce qui permet de terminer l'application une fois que toutes les requêtes auront été complétées. Afin également d'éviter la destruction prématurée des objets Logger et Prime, les appels à boost::bind prennent un shared_from_this qui va garder une référence sur l'objet au sein du message.

Pour compiler sous Unix:

g++ main.cpp -o main -lboost_system-mt -lboost_thread-mt


#include
#include
#include
#include
#include
#include
#include
#include
#include
#include

using namespace boost::assign;

class Service : public boost::noncopyable
{
public:
Service(int nbThreads):
m_work(new boost::asio::io_service::work(m_service))
{
for(int i = 0; i < nbThreads; ++i)
{
m_threadGroup.create_thread
(boost::bind(&boost::asio::io_service::run,
boost::ref(m_service)));
}
}

~Service()
{
m_work.reset();
m_threadGroup.join_all();
}

boost::asio::io_service & getService()
{
return m_service;
}

private:
boost::asio::io_service m_service;
boost::scoped_ptr m_work;
boost::thread_group m_threadGroup;
};

class Logger : public boost::noncopyable,
public boost::enable_shared_from_this
{
public:
Logger(boost::asio::io_service & service):
m_strand(service)
{
}

void log(const std::string & message)
{
m_strand.dispatch(boost::bind(&Logger::doLog,
shared_from_this(),
message));
}

private:
void doLog(const std::string & message)
{
std::cout << message << std::endl;
}

boost::asio::io_service::strand m_strand;
};

class Prime : public boost::noncopyable,
public boost::enable_shared_from_this
{
public:
Prime(boost::asio::io_service & service,
const boost::shared_ptr & logger):
m_service(service),
m_logger(logger)
{
}

void nthPrime(size_t n)
{
m_service.dispatch(boost::bind(&Prime::doNthPrime,
shared_from_this(),
n));
}

private:
void doNthPrime(size_t n)
{
std::vector primes(1, 2);
size_t current = 2;
while(primes.size() < n)
{
current++;
size_t i;
for(i = 0; i < primes.size(); ++i)
{
if(current % primes.at(i) == 0)
break;
}
if(i >= primes.size())
{
primes.push_back(current);
}
}
std::ostringstream str;
str << n << "th prime is " << primes.at(n - 1);
m_logger->log(str.str());
}

boost::asio::io_service & m_service;
boost::shared_ptr m_logger;
};

int main()
{
boost::shared_ptr service
(new Service(3));
boost::shared_ptr logger
(new Logger(service->getService()));
boost::shared_ptr prime
(new Prime(service->getService(), logger));

std::vector values;
values +=
20000, 10000, 5000, 7500, 15000,
17000, 14000, 11000, 21000, 11000;

std::for_each(values.begin(),
values.end(),
boost::bind(&Prime::nthPrime, prime, _1));
}


Asio fournit bien plus qu'une couche d'abstraction au dessus des sockets réseau. Il serait dommage de cantonner cette bibliothèque à la communication client serveur, alors qu'elle fournit un paradigme puissant pour construire des applications événementielles sans avoir à s'arracher les cheveux sur les problèmes bas-niveau.

Aucun commentaire: