Segue i precedenti post e post dove illustro una soluzione scalabile per l'implementazione di una procedura di elaborazione di back-end con utilizzo del thread pool.

Soluzione basata su Thread Pool con .NET Framework 4.0

Ho realizzato la soluzione basata il .NET Framework 4.0 utilizzando le nuove classi disponibili nel namespace System.Threading.Tasks. Il problema della schedulazione è stato facilmente risolto in quanto ho ereditato una soluzione ottimale direttamente dal sito msdn basata su questo articolo How to: Create a Task Scheduler That Limits the Degree of Concurrency nel quale è disponibile una classe di esempio che estende la classe TaskScheduler e implementa il controllo del limite di concorrenza dei task eseguti nel pool.

La versione sottoclasse di TaskNet che implementa l'esecuzione effettiva che si occupa di avviare i task risulta relativamente semplice, essendo costruita sulle nuove classi System.Threading.Tasks, ma soprattuto non richiede impegno aggiuntivo per scrivere classi particolari di sincronizzazione.

    public class EngineThreadPool2 : Engine

    {

        ...

        private readonly CancellationTokenSource cancellationSource = new CancellationTokenSource();

        ... 

        protected override void MainLoop()

        {

            TaskScheduler taskScheduler = null;

 

            if (numThreads.HasValue)

                taskScheduler = new LimitedConcurrencyLevelTaskScheduler(numThreads.Value);

 

            TaskFactory factory = new TaskFactory(taskScheduler);

 

            List<Task> enqued = new List<Task>();

 

            foreach (TaskFakeMixed task in TaskProvider.Tasks)

            {

                TaskFakeMixed taskToExecute = task;

 

                enqued.Add(factory.StartNew(() =>

                                                {

 

                                                    taskToExecute.ProcessTask();

                                                    UpdateStatistics(taskToExecute);

                                                }, cancellationSource.Token));

            }

 

            try

            {

                Task.WaitAll(enqued.ToArray(), cancellationSource.Token);

            }

            catch (OperationCanceledException)

            {

            }

        }

        ... 

        protected override void RequestStopTasks()

        {

            cancellationSource.Cancel();

        }

        ...

    }

Notare l'uso della classe TaskFactory la quale prevede un costruttore che accetta una classe TaskScheduler, nel caso sia passatto null viene utilizzato lo scheduler di default. La creazione del Task avviene attraverso il metodo TaskFactory.StartNew(), in questo caso è stato utilizzato l'overload che prevede in ingresso un Action, delegato che contiene il corpo del task, e un oggetto di tipo CancellationTokenSource che permette di notificare la cancellazione dell'esecuzione al task. Una volta che tutti i task sono stati creati e inseriti nel pool per l'esecuzione, il thread viene messo in attesa attraverso il metodo Task.WaitAll(...) al quale viene passato il vettore dei task e il CancellationToken per interrompere l'esecuzione. Quando l'utente decide di fermare il processo la form chiama il metodo RequestStopTasks() dell'engine che a sua volta invoca CancellationTokenSource.Cancel() il quale comunica la richiesta di cancellazione al token, e fa generare l'eccezione OperationCanceledException a Task.WaitAll(...) sbloccando l'esecuzione.

Soluzione alternativa con l'uso delle Parallel Extensions del .NET Framework 4.0

Una soluzione alternativa è quella di utilizzare le estensioni per l'esecuzione parallela, cosidette Parallel Extensions, di cui ci omaggia il .NET Framework 4.0. In questo caso il cuore del motore di schedulazione, la funzione Engine.MainLoop(), risulta molto semplice, di seguito riporto il codice.

    public class EngineParallelExt : Engine

    {

       ... 

        protected override void MainLoop()

        {

            if(numThreads.HasValue)

            {

                ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = numThreads.Value };

                Parallel.ForEach(TaskProvider.Tasks, options, (task, state) => ExecuteTask(state, task));

            }

            else

            {

                Parallel.ForEach(TaskProvider.Tasks, (task, state) => ExecuteTask(state, task));

            }

        }

 

        private void ExecuteTask(ParallelLoopState state, TaskFakeMixed task)

        {

            task.ProcessTask();

 

            UpdateStatistics(task);

 

            if (IsRequestStop)

                state.Break();

        }

       ... 

    }

La funzione chiave è il metodo statico Parallel.ForEach dotato di numerosi overloads, in particolare nella soluzione proposta, se è stato specificato un limite di parallelismo, viene utilzzata anche un istanza della classe ParallelOptions che ci mette a disposizione una proprietà MaxDegreeOfParallelism per definire il massimo parallelismo da applicare nell'esecuzione dei task, altrimenti le Task Parallel Library utilizzano il modello di partizione e le opzoni predefinite (vedi Data Parallelism (Task Parallel Library)). L'overload utilzzato è quello che prevede la collezione di elementi e un Action con due parametri, questi ultimi sono un ParallelLoopState e un TSource che rappresenta il tipo di item della collezione di origine. L'oggetto di tipo ParallelLoopState viene utilizzato per terminare l'esecuzione dell'elaborazione parallela attravereso il metodo Break().

Indice post - Soluzione di back-end scalabile basata su Thread Pool

Introduzione
Soluzione .NET 3.5