Multithread sous Linux
Applications -> Framework
Je veux designer des applications, très simples au départ, qui couvrent un spectre assez large de domaines (algos, système, bonnes pratiques etc) pour en déduire un framework qui en facilite l'écriture, les tests, la maintenance et la compréhension externe.
Application 1 : Mandelbrot threadé.
But assez simple : n threads qui font tourner un code de calcul de l'ensemble de Mandelbrot. Quand les n threads ont terminées leur travail, l'application sort.
#include <pthread.h>
//Le tableau des Ids système des threads
pthread_t* ThreadIds = NULL;
//Deux mutex : l'indice de la dernière thread créée et le nombre de threads qui tournent à un moment donné.
pthread_mutex_t ThreadIndexMutex, ThreadCountMutex;
int ThreadIndex = 0, ThreadCount = 0;
void* ThreadMain(void* arg); //La fonction qui démarre une thread du pool.
struct ThreadObject { int Index; }; //Les prémisses de la future classe Thread
void Mandelbrot(ThreadObject* host); //La fonction de calcul.
int main(int argc, char *argv[])
{
cout << "Test de gestion de threads" << endl;
//
int NbMaxThreads = 12;
ThreadIds = new pthread_t[NbMaxThreads];
pthread_mutex_init(&ThreadIndexMutex, NULL);
pthread_mutex_init(&ThreadCountMutex, NULL);
int i;
for(i=0; i<NbMaxThreads; i++) assert(pthread_create(ThreadIds+i, NULL, ThreadMain, NULL) == 0);
timespec Delay;
Delay.tv_sec = 0; Delay.tv_nsec = 100000;
nanosleep(&Delay, NULL);
while(ThreadCount) nanosleep(&Delay, NULL);
pthread_mutex_destroy(&ThreadIndexMutex);
pthread_mutex_destroy(&ThreadCountMutex);
return EXIT_SUCCESS;
}
void* ThreadMain(void* arg)
{
pthread_mutex_lock(&ThreadCountMutex);
ThreadCount++;
pthread_mutex_unlock(&ThreadCountMutex);
//
pthread_mutex_lock(&ThreadIndexMutex);
ThreadObject* Obj = new ThreadObject();
Obj->Index = ThreadIndex++;
pthread_mutex_unlock(&ThreadIndexMutex);
Mandelbrot(Obj);
//
pthread_mutex_lock(&ThreadCountMutex);
ThreadCount--;
pthread_mutex_unlock(&ThreadCountMutex);
}
//Mandelbrot en float
void Mandelbrot(ThreadObject* host)
{
static float xmin = -2.0, xmax = 0.6;
static float ymin = -1.2, ymax = 1.2;
int x, y, w = 800, h = 800;
float xinc = (xmax-xmin) / w;
float yinc = (ymax-ymin) / h;
float ci, cr;
for(ci=ymin, y=0; y<h; y++, ci+=yinc) {
for(cr=xmin, x=0; x<w; x++, cr+=xinc) {
float curr = cr, curi = ci;
int NbIter = 0;
while(NbIter < 255 && curr*curr + curi*curi < 4.0f) {
float tempr = curr;
curr = curr*curr - curi*curi + cr;
curi = 2.0f * tempr*curi + ci;
NbIter++;
}
}
printf("Thread %d : %.2f\n", host->Index, 100.0f*y/h);
}
}
Un code assez simple à comprendre mais qui contient tout un tas d'appels système qui compliquent la rédaction et la compréhension externe de l'application. Voilà à quoi elle peut ressembler si on fait un petit saut conceptuel :
class MandelbrotThread : WorkingThread
{
public:
virtual void init()
{
}
virtual void run()
{
static float xmin = -2.0, xmax = 0.6;
static float ymin = -1.2, ymax = 1.2;
int x, y, w = 800, h = 800;
float xinc = (xmax-xmin) / w;
float yinc = (ymax-ymin) / h;
float ci, cr;
for(ci=ymin, y=0; y<h; y++, ci+=yinc) {
for(cr=xmin, x=0; x<w; x++, cr+=xinc) {
float curr = cr, curi = ci;
int NbIter = 0;
while(NbIter < 255 && curr*curr + curi*curi < 4.0f) {
float tempr = curr;
curr = curr*curr - curi*curi + cr;
curi = 2.0f * tempr*curi + ci;
NbIter++;
}
}
printf("Thread %d : %.2f\n", Index, 100.0f*y/h);
}
}
virtual void exit()
{
}
protected:
};
int main(int argc, char *argv[])
{
cout << "Threads with classes !" << endl;
//
int i, NbMaxThread = 12;
for(i=0; i<NbMaxThread; i++) MandelbrotThread* CurThread = new MandelbrotThread();
while(ThreadCount.Value) WorkingThread::sleep(0, 100000);
return EXIT_SUCCESS;
}
Cette fois on ne se concentre que sur le but premier de l'application : créer n tâches (la boucle for de la fonction main()), le calcul des n tâches (la fonction virtuelle run de la classe MandelbrotThread) et attendre qu'il n'y ait plus aucune thread qui calcule (ThreadCount.Value doit être nul). Le problème étant qu'il y a beaucoup d'implicite dans ce code : c'est le framework qui prend forme :). Voilà ce qu'on doit inclure en .h ou en librairie pour que le code ci-dessus fonctionne (rappel : je travaille sous KDevelop avec GCC 3.4.6, YMMV)
template <class T> class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&Internal, NULL);
}
~Mutex()
{
pthread_mutex_destroy(&Internal);
}
void lock()
{
pthread_mutex_lock(&Internal);
}
void unlock()
{
pthread_mutex_unlock(&Internal);
}
//
T Value; //La valeur qu'héberge le mutex
protected:
pthread_mutex_t Internal; //The system mutex
};
Mutex<int> ThreadIndex; //Indice maximal des threads de l'application
Mutex<int> ThreadCount; //Nombre de thread concurrentes
class WorkingThread
{
public:
//L'exécution du constructeur de la classe de base d'une thread ne s'exécute pas dans la file d'exécution elle-même : la thread système n'est pas encore créée !
WorkingThread() //TODO : trouver un mécanisme pour ajouter des paramètres à ce constructeur, params qui viendraient du new Thread...
{
assert(pthread_create(&SystemThread, NULL, (void* (*) (void*)) ThreadMain, this) == 0);
}
virtual void init() = 0; //Equivalent du constructeur pour une thread
virtual void run() = 0;
virtual void exit() = 0; //Equivalent du destructeur
//Helper pour qu'une thread se mette en sommeil (attention, non thread-safe !)
static void sleep(int sec, int nsec)
{
timespec Delay;
Delay.tv_sec = sec; Delay.tv_nsec = nsec;
nanosleep(&Delay, NULL);
}
protected:
int Index; //Indice interne, i.e. une ID
pthread_t SystemThread; //Identifiant système de la thread
static void* ThreadMain(WorkingThread* host)
{
ThreadIndex.lock(); host->Index = ThreadIndex.Value++; ThreadIndex.unlock();
ThreadCount.lock(); ThreadCount.Value++; ThreadCount.unlock();
//
host->init();
host->run();
host->exit();
//
ThreadCount.lock(); ThreadCount.Value--; ThreadCount.unlock();
//Que retourner ???
}
};
Bien sûr que tout cela est foncièrement basique, mais ça prend forme, nous avons une classe de base pour les threads, un type pour les mutex et deux mutexes globaux pré-instanciés qui décomptent le nombre de threads actives et l'Id maximale d'une thread. De la même façon, le mécanisme de création générique de thread est lancé : un new XXXThread dans une thread parente va appeler le constructeur de la classe de base WorkingThread qui va créer la thread système (une p(osix)thread sous linux), trois méthodes virtuelles (init, run et exit) qui vont être appelée automatiquement par le framework au fur et à mesure de la vie de la tâche. C'est peu mais c'est déjà beaucoup.
J'ai déplacé des membres et l'helper 'sleep' de WorkingThread dans Thread, mais j'ai laissé la création de la working thread où elle se trouvait : ça fonctionne donc pas touche et comme Thread est une classe de base, elle ne va peut-être jamais représenter de thread concrètes et ce mécanisme permet d'avoir des thread potentiellement différentes dans chaque classe dérivée de Thread ce qui est bien ! Je laisse les virtuelles pures dans WorkingThread car elles ont leur place, alors que je ne sais pas encore ce que je vais faire de Thread... Je crée une classe Application plutôt vide pour l'instant et une classe dérivée pour le haut-niveau qui représente mon application : elle contient l'ancien contenu de la fonction main qui était auparavant apparente. J'ai créé une macro préprocesseur pour générer une fonction main() valide qui fait le lien avec l'objet dérivé d'Application : FFW_MAINENTRY.
J'ai profité de l'application précédente qui fonctionnait pour implémenter un système rudimentaire de messages inter-threads. Ainsi chacune des threads filles peut envoyer sous forme de simples chaînes de caractères son niveau de complétion pour le calcul qui la préoccupe. Ce message une fois reçu par la thread maîtresse est affiché (via la librairie ncurses) pour indiquer la complétion globale du programme.
class MandelbrotThread : public WorkingThread
{
public:
virtual ~MandelbrotThread() { }
virtual void run()
{
static float xmin = -2.0, xmax = 0.6;
static float ymin = -1.2, ymax = 1.2;
int x, y, w = 800, h = 800;
float xinc = (xmax-xmin) / w;
float yinc = (ymax-ymin) / h;
float ci, cr;
for(ci=ymin, y=0; y<h; y++, ci+=yinc) {
for(cr=xmin, x=0; x<w; x++, cr+=xinc) {
float curr = cr, curi = ci;
int NbIter = 0;
while(NbIter < 255 && curr*curr + curi*curi < 4.0f) {
float tempr = curr;
curr = curr*curr - curi*curi + cr;
curi = 2.0f * tempr*curi + ci;
NbIter++;
}
}
char* Str = NULL;
string_format(&Str, "%d:%.2f", Index, 100.0f*y/h);
sendTo(ParentThread, Str);
string_release(&Str);
sleep(0, 10); //Important pour que les threads plus équilibrées
}
}
};
class MandelApp : public Application
{
public:
virtual int run()
{
mtrace();
int AppRet;
if ((AppRet = Application::run()) != EXIT_SUCCESS) return AppRet;
initscr(); //Initialisation de NCurses
//
ThreadCount.Value = 0;
ThreadIndex.Value = 1;
cout << "Threads with classes !" << endl;
int i, NbMaxThread = 8;
Thread** ThreadsList = new Thread*[NbMaxThread];
float* Completions = new float[NbMaxThread];
for(i=0; i<NbMaxThread; i++) {
ThreadsList[i] = new MandelbrotThread();
}
WorkingThread::sleep(0, 100);
char* msg = NULL;
bool StillActives = true;
for(i=0; i<NbMaxThread; i++) Completions[i] = 0.0f;
while(ThreadCount.Value || StillActives) {
for(i=0, StillActives = false; i<NbMaxThread; i++) StillActives |= ThreadsList[i]->Index != 0;
int StoreNbMsgs = _Queue._Counter; //On sauve le nombre de messages en attente
while (hasMessage() && getMessage(&msg)) {
int ThreadNumber;
float Completion;
sscanf(msg, "%d:%f", &ThreadNumber, &Completion);
Completions[ThreadNumber-1] = Completion;
}
//On va afficher un panel contenant l'état d'avancement de chaque thread de calcul
char *ThreadMsg = NULL, *MainMsg = NULL;
string_format(&MainMsg, "Nombre de message en attente dans la pile principale : %d ", StoreNbMsgs);
mvprintw(0, 5, MainMsg);
for(i=0; i<NbMaxThread; i++) {
string_format(&ThreadMsg, "Avancement de la thread %d : %.2f / 100", i, Completions[i]);
mvprintw(i+2, 0, ThreadMsg);
}
refresh();
string_release(&ThreadMsg);
string_release(&MainMsg);
//
Thread::sleep(0, 10*1000*1000);
}
//
printf("On détruit les objets Thread\n");
for(i=0; i<NbMaxThread; i++) { delete ThreadsList[i]; ThreadsList[i] = NULL; }
printf("On détruit le tableau hébergeant la liste des threads\n");
delete []ThreadsList;
delete []Completions;
endwin(); //Sortie de NCurses
//
return EXIT_SUCCESS;
}
};
FFW_MAINENTRY(MandelApp)Pour faire ce programme j'ai modifié la hiérarchie des classes et ait eu besoin de créer la classe de base Thread. Pourquoi ? Parce que pendant qu'on y était, il était très simple de créer une classe Application (wrapper plus sympa que le classique main(...)) dérivée elle-même de Thread et d'avoir avec cette simple application un test utilisant deux types de threads différents toutes deux équipées d'une queue de messages qui leur permet de communiquer très simplement. Ma \"librairie\" (trop pompeux pour l'instant) est comme suit :
//Premières fonctions de la future classe String
void string_release(char** str)
{
if (*str) delete [](*str);
*str = NULL;
}
void string_create(char** pStr, int ln)
{
*pStr = new char[ln+1];
(*pStr)[ln] = '\0';
}
int string_format(char** destStr, const char* format, ...)
{
string_release(destStr);
//On détermine la taille de la future string
va_list StrVars;
va_start(StrVars, format);
int Len = vsnprintf(NULL, 0, format, StrVars);
string_create(destStr, Len);
va_end(StrVars);
//On refait la même chose mais en créant la string cette fois
va_start(StrVars, format);
vsnprintf(*destStr, Len+1, format, StrVars); //Len+1 car vsnprintf renvoie la taille sans '\0' mais écrit Len+1 chars y compris '\0'
va_end(StrVars);
return Len;
}
void string_affectCopy(char** pStr, const char* newVal)
{
string_release(pStr);
int Len;
string_create(pStr, Len = strlen(newVal));
strcpy(*pStr, newVal);
}
template <class T> class Mutex
{
public:
Mutex() { pthread_mutex_init(&_Internal, NULL); /*_Owner = 0;*/ }
~Mutex() { pthread_mutex_destroy(&_Internal); }
void lock() { pthread_mutex_lock(&_Internal); /*_Owner = pthread_self();*/}
void unlock() { pthread_mutex_unlock(&_Internal); /*_Owner = 0;*/ }
//
T Value; //La valeur qu'héberge le mutex
protected:
pthread_mutex_t _Internal; //The system mutex
// pthread_t _Owner; //Pour une future gestion des locks
};
class Thread
{
public:
class MessageQueue
{
public:
class Message
{
public:
Message(const char* str=NULL) { _Next = NULL; _Str = NULL; setStr(str); }
~Message() { string_release(&_Str); }
void setStr(const char* str) { string_affectCopy(&_Str, str); }
char* _Str;
Message *_Next;
};
//
MessageQueue()
{
_FirstMessage = _LastMessage = NULL;
_Counter = 0;
}
~MessageQueue()
{
ListReady.lock();
Message* CurMessage = _FirstMessage;
while (_FirstMessage) {
CurMessage = _FirstMessage->_Next;
delete _FirstMessage;
_FirstMessage = CurMessage;
}
_LastMessage = NULL;
ListReady.unlock();
}
//On empile un message à la fin de la liste
void push(const char* msg)
{
ListReady.lock();
//
Message* Shell = new Message(msg);
Shell->_Next = NULL;
if (_LastMessage) _LastMessage->_Next = Shell;
_LastMessage = Shell;
if (!_FirstMessage) _FirstMessage = _LastMessage;
_Counter++;
//
ListReady.unlock();
}
//On dépile le premier message de la liste
bool pop(char** str)
{
ListReady.lock();
//
if (!_Counter) { ListReady.unlock(); return false; }
Message* TempMsg = _FirstMessage;
_FirstMessage = _FirstMessage ? _FirstMessage->_Next : NULL;
if (TempMsg) {
string_affectCopy(str, TempMsg->_Str);
TempMsg->_Next = NULL;
delete TempMsg;
} else string_release(str);
if (!_FirstMessage) _LastMessage = NULL;
_Counter--;
//
ListReady.unlock();
return true;
}
//
Message *_FirstMessage, *_LastMessage;
int _Counter;
Mutex<int> ListReady;
};
public:
Thread() { }
virtual ~Thread() { ParentThread = NULL; SystemThread = 0; }
//Renvoie true si la thread courant a des messages en attente, false sinon.
bool hasMessage() const { return _Queue._Counter; }
//Renvoie true si msg a été rempli avec le premier message de la liste d'attente, false si cette liste ne contenait aucun message en attente.
bool getMessage(char** msg) { return _Queue.pop(msg); }
//TODO : accèder à une thread par son index plutôt que par un pointeur.
void sendTo(Thread* target, const char* msg) { target->_Queue.push(msg); }
//Helper pour qu'une thread se mette en sommeil (attention, non thread-safe !)
static void sleep(int sec, int nsec)
{
timespec Delay;
Delay.tv_sec = sec; Delay.tv_nsec = nsec;
nanosleep(&Delay, NULL);
}
//
int Index; //Indice interne, i.e. une ID
pthread_t SystemThread; //Identifiant système de la thread
MessageQueue _Queue;
Thread* ParentThread;
};
class Application : public Thread
{
public:
Application() { ParentThread = NULL; SystemThread = pthread_self(); }
virtual int run()
{ cout << "Application..." << endl; return EXIT_SUCCESS; }
};
static Application* MainApp = NULL;
//Intégration de la fonction main(), obligatoire en C++, et d'un objet dérivé d'Application.
#define FFW_MAINENTRY(AppClassName) \
int main(int argc, char *argv[]) \
{ \
MainApp = new AppClassName(); \
int Ret = MainApp->run(); \
delete MainApp; \
return Ret; \
}
Mutex<int> ThreadIndex; //Indice maximal des threads de l'application
Mutex<int> ThreadCount; //Nombre de thread concurrentes
class WorkingThread : public Thread
{
public:
//L'exécution du constructeur de la classe de base d'une thread ne s'exécute pas dans la file d'exécution elle-même : la thread système n'est pas encore créée !
WorkingThread() //TODO : trouver un mécanisme pour ajouter des paramètres à ce constructeur, params qui viendraient du new Thread...
{
int Ret = pthread_create(&SystemThread, NULL, (void* (*) (void*)) ThreadMain, this);
assert(!Ret);
}
virtual ~WorkingThread() { }
virtual void init() {} //Equivalent du constructeur pour une thread
virtual void run() {};
virtual void exit() {}; //Equivalent du destructeur
protected:
static void* ThreadMain(WorkingThread* host)
{
ThreadIndex.lock(); host->Index = ThreadIndex.Value++; ThreadIndex.unlock();
ThreadCount.lock(); ThreadCount.Value++; ThreadCount.unlock();
//
host->ParentThread = MainApp;
host->init();
host->run();
host->exit();
//
ThreadCount.lock(); ThreadCount.Value--; ThreadCount.unlock();
//
ThreadIndex.lock();
host->Index = 0;
ThreadIndex.unlock();
//
return EXIT_SUCCESS;
}
};GUI Simple : apprentissage de X11
Des objets simples (carrés, rectangles, ronds) que l'on déplace sur un canevas, dont on peut modifier et visualiser les propriétés et que l'on peut importer/exporter. On va réaliser l'application en deux temps : design préliminaire + pseudo-code pour réaliser l'application suivante et retour sur une implémentation réaliste.
MiniGUI
Fenêtres, boutons, statics (en utilisant FreeType ou Pango ?), custom-control. Rien de bien complexe mais il faut apprendre X11 en en profitant pour améliorer/corriger le framework et peut-être implémenter des nouveaux mécanismes.
Testeur de ma HashTable
Quelques actions pour remplir et tester la table, et un panel de profile (temps machine, remplissage de la table).
Mandelbrot Renderer
Un panel de calcul et de rendu et un autre de stats (position, temps machine détaillé, méthode utilisée etc).
Moteur 3D simplifié
Un ensemble d'objets simples (cubes et sphère) avec la possibilité de se promener, de cliquer sur un objet et d'obtenir ses propriétés.
Commentaires...
//Comment lier tout ce joli schéma ci-dessous avec un double tableau ID<-->Pointeur ?
class SmartPointer
{
public:
SmartPointer(const T*); //Copy-constructeur
operator=(const T*); //Copy-constructeur
T* operator->() const;
T& operator.() const;
protected:
T* _Pointee;
};
//Un Array avec des méthodes facilitant le parcours de ses éléments
template <class T> class Collection
{
public:
Collection() { _Cursor = 0; _Count = 0; _List = NULL; }
~Collection() { }
//Helpers
void start() { _Cursor = 0; }
void end() { _Cursor = _Count-1; }
bool isLast() const { return _Cursor == _Count; }
int length() const { return _Count; }
//Manipulation de la liste
int push(T*);
bool set(index, T*);
bool get(index, SmartPointer<T>& item) const
{
item = _List[index]; //Copie-constructeur du SmartPointer
return true;
}
merge(const Collection<T>)
//Parcours de la liste
bool forEach(SmartPointer<T>& item) const
{
if (isLast()) return false;
return get(_Cursor++, item);
}
protected:
T** _List;
mutable int _Cursor;
int _Count;
};
Collection<Object3D> Coll;
//On remplit la collection avec des pointeurs vers des éléments de type Object3D
Coll.start();
SmartPointer<Object3D> Object;
while(Coll.forEach(Object)) {
printf(Object.position);
}
