Segue il precedente post dove illustro una soluzione scalabile per l'implementazione di una procedura di elaborazione di back-end con utilizzo del thread pool.

Form principale

La form principale si occupa di costruire e inizializzare l'engine e avviarne l'esecuzione, l'engine viene eseguito in un thread e comunica con la form principale attraverso eventi asincroni (vedi precedente post Sincronizzare un applicazione Windows Forms con un thread separato definendo un servizio astratto). Il thread dell'engine principale rimane in esecuzione finchè tutte le attività non sono state processate oppure avviene un eccezione inaspettata.

L'engine dispone di un entry-point denominato Start() che avvia il thread di esecuzione principale (2o livello descritto nel post precedente).

    public abstract class Engine

    {

        ...

        public void Start()

        {

            thread = new Thread(StartThread);

            thread.Start();

        }

 

        private void StartThread()

        {

            requestStop = false;

 

           try

           {

              NotifyStartInfo();

              StartEngine();

              InitStatistics();

              MainLoop();

              NotifyEndInfo();

           }

           catch (Exception exception)

           {

              lastRunException = exception;

           }

 

            InvokeOnSchedulerTerminated();

        }

 

        protected abstract void MainLoop();

       ... 

}

Come si può vedere la classe Engine è astratta, in particolare il metodo MainLoop() della sotto classe deve implementare il ciclo principale, che nella sostanza è un algoritmo di tipo produttore/consumatore, dove, nel nostro caso il produttore genera i task di elaborazione mentre il consumo dei task avviene nel thread pool.

Soluzione basata su Thread Pool con .NET Framework 3.5

Innanzitutto per implementare la soluzione con il .NET Framework 3.5 ho realizzato alcune classi Helper, la più importante è la classe denominata TaskNet35, la quale fornisce metodi per creare, avviare e sincronizzarsi con un Task messo in coda per l'esecuzione nel Thread Pool (Per approfondimenti vedi Programming the Thread Pool in the .NET Framework). Di seguito ne riporto le parti più importanti.

    /// <summary>

    /// Rappresenta un task da eseguire

    /// </summary>

    public abstract class TaskNet35

    {

        public static int MaxWaitHandlersCount = 64;

 

        public event EventHandler TaskFinished;

 

        private ManualResetEvent finishedEvent = new ManualResetEvent(false);

        private bool requestStop = false;

        private bool finished;

        private bool started;

        private Exception taskException;

 

        ... 

        public void Start()

        {

            started = true;

 

            ThreadPool.QueueUserWorkItem(InternalTaskOperation);

        }

 

        private void RequestStop()

        {

            requestStop = true;

        }

 

        private void InternalTaskOperation(object state)

        {

            if (requestStop)

            {

                SignalTaskFinished();

                return;

            }

 

            try

            {

                TaskOperation();

            }

            catch (Exception exception)

            {

                taskException = exception;

            }

 

            SignalTaskFinished();

            InvokeTaskFinished();

        }

 

        private void SignalTaskFinished()

        {

            finished = true;

            finishedEvent.Set();

        }

 

        /// <summary>

        /// Attività del task da implementare nella sottoclasse

        /// </summary>

        protected abstract void TaskOperation();

 

        /// <summary>

        /// Attende che uno qualsiasi dei task passati siano terminati

        /// </summary>

        public static void WaitAny(TaskNet35[] tasks)

        {

            ManualResetEvent[] handles = (from task in tasks select task.finishedEvent).ToArray();

            if (handles.Length > 0)

                WaitHandle.WaitAny(handles);

        }

 

        /// <summary>

        /// Attende che tutti i task passati siano terminati

        /// </summary>

        public static void WaitAll(TaskNet35[] tasks)

        {

            ManualResetEvent[] handles = (from task in tasks select task.finishedEvent).ToArray();

 

            if (handles.Length > 0)

                WaitHandle.WaitAll(handles);

        }

       ... 

    }

Come si può vedere la classe fornisce due metodi statici WaitAll e WaitAny per sincronizzarsi con il vettore di Task passati, il primo metodo rimane in attesa fino a quando tutti i task del vettore sono terminati, il secondo invece rimane in attesa l'esecuzione fino a quando almeno uno dei task in esecuzione è terminato. Per implementare la sincronizzazione ogni istanza di TaskNet35 incapsula un oggetto ManualResetEvent, un evento inizializzabile manualmente per la sincronizzazione multithread. Inoltre il metodo Start() inserisce nella coda del Thread Pool il task attraverso il metodo statico QueueUserWorkItem la cui implementazione è rappresentata dalla funzione InternalTaskOperation(), importante è la chiamata di finishedEvent.Set() che imposta a true lo stato dell'oggetto di sincronizzazione e quindi sblocca l'eventuale thread in attesa. Inoltre la classe implementa un sistema di annullamento dell'esecuzione, attraverso il campo requestStop di tipo bool e la chiamata al metodo RequestStop(), questo approccio prevede che se il task è stato inserito nella coda del thread pool e non è ancora stato eseguito, quest'ultimo termina immediatamente.

Successivamente ho implementato una classe denominata TaskPoolNet35 che fa da "scheduler" per l'avvio dei Task controllando la concorrenza di esecuzione dei task qui di seguito riporto tutto il codice della classe essendo relativamente semplice.

    /// <summary>

    /// Limita l'esecuzione della concorrenza di task nel Thread pool.

    /// </summary>

    public class TaskPoolNet35

    {

        private readonly TaskNet35[] tasks;

 

        public TaskPoolNet35()

        {

            tasks = new TaskNet35[TaskNet35.MaxWaitHandlersCount];

        }

 

        public TaskPoolNet35(int numThreads)

        {

            tasks = new TaskNet35[numThreads];

        }

 

        public void AddWaitStartTask(TaskNet35 task)

        {

            for (int idx = 0; idx < tasks.Length; idx++)

            {

                if(tasks[idx]==null || tasks[idx].Finished)

                {

                    tasks[idx] = task;

                    task.Start();

                    return;

                }

            }

 

            TaskNet35.WaitAny(tasks);

            AddWaitStartTask(task);

        }

 

        public void WaitAll()

        {

            bool containsAlmostOneTask = tasks.Any(t => t != null);

 

            if (containsAlmostOneTask)

                TaskNet35.WaitAll(tasks.ToArray());

        }

 

        public void StopAll()

        {

            bool containsAlmostOneTask = tasks.Any(t => t != null);

 

            if (containsAlmostOneTask)

                TaskNet35.StopAll(tasks.ToArray());

        }

    }

Come si può vedere il core è la funzione AddWaitStartTask() che ha due compiti, inserire il task in un vettore ed eseguirlo e quando il vettore è pieno attendere che un task si liberi per inseirlo nel vettore ed eseguirlo. Usando opportunamente la ricorsività la funzione AddWaitStartTask() risulta molto compatta. Nel costruttore della classe TaskPoolNet35 deve essere indicato il parallelismo voluto, se non indicato si assume sia 64, che è il limite massimo di Handle sincronizzabili con le funzioni WaitHandle.WaitAll(..) e WaitHandle.WaitAny(..).

A questo punto, è sufficiente implementare la sottoclasse di TaskNet che implementa l'esecuzione dell'operazione effettiva (nell'esempio sottoriportato MyTaskNet35 è sottoclasse di TaskNet35 nel cui costruttore viene passato come parametro una classe nota denominata TaskFakeMixed che fornisce un motodo ProcessTask() il quale implementa il task effettivo, in alternativa si può implementare la logica del task direttamente nella sottoclasse, oppure passare un delegato nel costruttore della sottoclasse come avviene ad esempio con la classe Task del .NET Framework 4.0). Il MainLoop() dell'engine risulterà relativamente semplice.

        protected override void MainLoop()

        {

            taskPool = numThreads.HasValue ? new TaskPoolNet35(numThreads.Value) : new TaskPoolNet35();

 

            foreach (TaskFakeMixed task in TaskProvider.Tasks)

            {

                TaskFakeMixed taskToExecute = task;

                MyTaskNet35 myTaskNet35 = new MyTaskNet35(taskToExecute);

                myTaskNet35.TaskFinished += myTaskNet35_TaskFinished;

 

                //Add task wait if current running tasks concurrency > numThreads

                taskPool.AddWaitStartTask(myTaskNet35);

 

                if (IsRequestStop)

                    break;

            }

 

            //Wait all tasks

           taskPool.WaitAll();

 

       }

 

Importante notare il meccanismo di "uscita" nel caso di annullamento dell'esecuzione, questo si limita al controllo della proprietà IsRequestStop, in quanto l'esecuzione è già sincronizzata e limitata nel parallelismo dal metodo AddWaiStartTask(...), il quale mette in attesa il thread dell'Engine quando il vettore di task in esecuzione è completo. La chiamata a WaitAll() al termine è fondamentale per aspettare che tutti i task in coda siano eseguiti prima di restituire il controllo al chiamante e quindi terminare il thread di esecuzione del motore.

Nel prossimo post illusterò la medesima soluzione con l'ausilio dei "facilitatori" forniti dal .NET Framerwork 4.0, in particolare illustrerò una soluzione equivalente, e una soluzione alternativa utilizzando le parallel extension.

Indice post - Soluzione di back-end scalabile basata su Thread Pool

Introduzione