niedziela, 17 lutego 2013

[C#|Visual Studio] Threading: PLINQ

.NET Framework w wersji 4.0 udostępnił programistom nową bibliotekę - PFX (Parallel Framework). Wprowadza ona nową jakość w programowaniu wielowątkowym. Jednym z najważniejszych osiągnięć PFX jest PLINQ - mechanizm oferujący automatyzację wszystkich kroków w zrównoleglaniu obliczeń. Kroki te to:
  • podział zadań na Taski 
  • wykonywanie Tasków na wątkach
  • zebranie wszystkich wyników w całość
Parallel LINQ to zatem nic innego, jak równoległa implementacja dobrze znanego LINQ to objects. Kluczem do użycia PLINQ jest specjalna metoda AsParallel(). To dzięki niej, w sposób deklaratywny programista wprowadza zrównoleglenie obliczeń. Prosty przykład to poszukiwanie liczb pierwszych, gdzie podczas filtrowania wykonuje się proste operacje matematyczne.

static void PrimeNumbers()
{
    IEnumerable<int> numbers = Enumerable.Range(3, 100000 - 3);

    var parallelQuery =
      from n in numbers.AsParallel()
      where Enumerable.Range(2, (int)Math.Sqrt(n)).All(i => n % i > 0)
      select n;

    int[] primes = parallelQuery.ToArray();
    foreach (var prime in primes)
    {
        Console.Write("{0} ", prime);
    }
}

Zapytania PLINQ, podobnie jak każde inne LINQ są wykonywane z opóźnieniem, w momencie, gdy np. wywołamy ToArray, pętlę foreach itd. Równoległość obliczeń powoduje jednak, że wyniki będą nieuporządkowane. Jeżeli zależy nam na tym, by wynik wyglądał tak, jak w sekwencyjnym zapytaniu, możemy dodatkowo wywołać metodę AsOrdered()

from n in numbers.AsParallel().AsOrdered()

Wywołania takie wpływa oczywiście na pogorszenie wydajności, dlatego takie zachowanie nie pojawia się domyślnie.

PLINQ optymalizuje zrównoleglenie pod kątem możliwości sprzętowych. To jedna z największych zalet zwłaszcza pod kątem przyszłości, gdzie liczba rdzeni w komputerach PC będzie ciągle wzrastać. W przypadku niektórych komputerów, PLINQ może wykonać obliczenia sekwencyjnie pomimo użycia AsParallel(), np. dlatego, że zrównoleglenie może pogorszyć wydajność. Programista ma możliwość wymuszenia równoległości instrukcją

from n in numbers.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism)

Kolejną ciekawą instrukcją jest WithDegreeOfParallelism(), gdzie podajemy ilość Tasków, na których ma być przetwarzane zapytanie. Szczególnie przydatne jest to w przypadku odpytywania WebSerwisów, gdzie tracimy czas nie na obliczeniach na CPU, ale na oczekiwaniu na odpowiedź.

from n in numbers.AsParallel().WithDegreeOfParallelism(6)

piątek, 15 lutego 2013

[C#|Visual Studio] Threading: Timers

Timery dostępny w świecie .NET można podzielić na takie, które działają w obrębie jednego wątku oraz na timery wielowątkowe. Wybór zależy oczywiście głównie od potrzeb, ale także, w przypadku zegarków jednowątkowych od technologii. Mamy więc:
  • System.Windows.Forms.Timer w Windows Forms
  • System.Windows.Threading.DispatcherTimer w WPF

Multithreaded Timers

Najprostszym timerem do pracy z wieloma wątkami jest System.Threading.Timer, który składa się jedynie z konstruktora i dwóch metod.

static void ThreadingTimer()
{
    Timer tmr = new Timer (Tick, "tick...", 5000, 1000);
    Console.ReadLine();
    tmr.Dispose();
}

static void Tick (object data)
{
    Console.WriteLine (data);
}

W konstruktorze przekazujemy callback cyklicznie wywoływany, parametr tego callbacka, następnie interwał po którym pojawi się pierwsze wywołanie i interwał pomiędzy kolejnymi wywołaniami.

Bardziej rozbudowaną wersją jest Timer umiejscowiony w namespace System.Timers.  Rozszerzono API o kilka opcji zmieniając konstruktor na bezparametrowy. Ustawienia podajemy przez properties,  a ponadto możemy go wyłączać przez property Enabled, a także zatrzymywać odliczanie używając metody Stop. Sekwencja Start - Stop - Start resetuje odliczony czas.

static void TimersTimer()
{
    var tmr = new System.Timers.Timer();
    tmr.Interval = 5000;
    tmr.Elapsed += tmr_Elapsed;
    tmr.Start();
    Console.ReadLine();
    tmr.Stop(); 
    Console.ReadLine();
    tmr.Start();
    Console.ReadLine();
    tmr.Dispose();
}

static void tmr_Elapsed(object sender, EventArgs e)
{
    Console.WriteLine("Tick");
}

Oba powyższe timery wykorzystują pulę wątków, co umożliwia uruchomienie wielu ich instancji z różnych wątków. Interwał pomiędzy kolejnymi wykonaniami callbacka nie zależy od czasu wykonywania samego callbacka, a dokładność w obliczaniu czasu szacuje się na 10-20 ms.

Timery używane w WinForm i WPF dostarczają podobne API, jednak warto pamiętać, że są dedykowane do pracy w swoich środowiskach. Timery te nie wykorzystują puli wątków, natomiast w znacznej mierze zależą od mechanizmów UI, tzw. "message pumping". Zatem zdarzenie tick jest dostępne w tym samym wątku, z którego wystartowano zegarek.

środa, 13 lutego 2013

[C#|Visual Studio] Threading: BackgroundWorker

Aplikacje pisane w takich technologiach jak WinForms czy WPF wymagają, aby interfejs użytkownika był responsywny niemal przez cały czas. Zatem wszystkie kosztowne czasowo operacje powinny być wykonywane asynchronicznie. Klasą, która umożliwia łatwe przeprowadzanie takich operacji jest BackgroundWorker. Klasa ta dostarcza kilka gotowych mechanizmów, takich jak:
  • łatwa możliwość przerwania pracy
  • możliwość bezpiecznej aktualizacji kontrolek WPF i Formsowych
  • przekazywanie wyjątków do wątku nadrzędnego
  • możliwość raportowania o stanie pracy workera
Poniższy przykład obrazuje wszystkie możliwości BackgroundWorkera:

static BackgroundWorker _bw;

public static void Run()
{
    _bw = new BackgroundWorker
    {
        WorkerReportsProgress = true,
        WorkerSupportsCancellation = true
    };
    _bw.DoWork += bw_DoWork;
    _bw.ProgressChanged += bw_ProgressChanged;
    _bw.RunWorkerCompleted += bw_RunWorkerCompleted;
    _bw.RunWorkerAsync ("Hello to worker");
    Console.WriteLine ("Press Enter in the next 5 seconds to cancel");
    Console.ReadLine();
    if (_bw.IsBusy) _bw.CancelAsync();
    Console.ReadLine();
}

static void bw_DoWork (object sender, DoWorkEventArgs e)
{
    for (int i = 0; i <= 100; i += 20)
    {
        if (_bw.CancellationPending) { e.Cancel = true; return; }
        _bw.ReportProgress (i);
        Thread.Sleep (1000);
    }
    e.Result = 123;
}

static void bw_RunWorkerCompleted (object sender,
                                    RunWorkerCompletedEventArgs e)
{
    if (e.Cancelled)
        Console.WriteLine ("You canceled!");
    else if (e.Error != null)
        Console.WriteLine ("Worker exception: " + e.Error.ToString());
    else
        Console.WriteLine ("Complete: " + e.Result);      // from DoWork
}

static void bw_ProgressChanged (object sender,
                                ProgressChangedEventArgs e)
{
    Console.WriteLine ("Reached " + e.ProgressPercentage + "%");
}

Ustawienie odpowiednich propercji i zapięcie callbacków na zdarzenia jest konieczne w przypadku, gdy chcemy mieć możliwość anulowania i raportowania stanu pracy workera. Obie możliwości są opcjonalne. Dane pomiędzy wątkami przekazywane są poprzez odpowiednie EventArgs.

wtorek, 12 lutego 2013

[C#|Visual Studio] Threading: Event Wait Handles

Event Wait Handles wykorzystywane są do sygnalizacji pomiędzy wątkami, a więc w sytuacji gdy jeden wątek oczekuje na sygnał od innego. Mamy do dyspozycji kilka typów w C# 4.0

AutoResetEvent

Pozwala na powiadomienie jednego oczekującego wątku. Wątek czeka na sygnał. Oczekiwanie rozpoczyna wywołanie metody WaitOne, natomiast wątek notyfikowany jest za pomocą metody Set.

static EventWaitHandle _waitHandle = new AutoResetEvent(false);

static void RunAutoResetEvent()
{
    new Thread(Waiter).Start();
    Thread.Sleep(1000);
    _waitHandle.Set();
}

static void Waiter()
{
    Console.WriteLine("Waiting...");
    _waitHandle.WaitOne();
    Console.WriteLine("Notified");
}

Jeżeli metoda Set zostanie wywołana podczas gdy żaden wątek nie oczekuje, dopuszczony do działania zostanie pierwszy wątek, który wywoła metodę WaitOne. Metoda ta przyjmuje opcjonalnie jako parametr wartość timeoutu.

ManualResetEvent

Działa podobnie jak AutoResetEvent, z tą różnicą, że ustawienie Set powoduje, iż przepuszczony zostaje każdy wątek wołający WaitOne, natomiast Reset powoduje, że wątki czekają na powiadomienie.

CountdownEvent

Pozwala zaczekać na więcej niż jeden wątek, liczbę wątków, które muszą powiadomić określa się w konstruktorze. Wątki wysyłają sygnały poprzez metodę Signal.

static CountdownEvent _countdown = new CountdownEvent(3);

static void RunCountDownEvent()
{
    new Thread(SaySomething).Start("I am thread 1");
    new Thread(SaySomething).Start("I am thread 2");
    new Thread(SaySomething).Start("I am thread 3");
    _countdown.Wait();
    Console.WriteLine("All threads have finished speaking!");
}

static void SaySomething(object thing)
{
    Thread.Sleep(1000);
    Console.WriteLine(thing);
    _countdown.Signal();
}

Wywołanie metody Reset spowoduje przywrócenie licznikowi wyjściowej postaci.

poniedziałek, 11 lutego 2013

[C#|Visual Studio] Threading: Synchronizacja

Synchronizacja wątków jest szczególnie ważna, gdy korzystają one z tych samych zasobów, jednak niezsynchronizowane wątki mogą także w wielu przypadkach doprowadzić do nieprzewidywalnego zachowania programu. Konstrukcje koordynujące wątki dzieli się na cztery kategorie:

Blokowanie

Wątek zablokowany, to taki, którego wykonywanie z jakiegoś powodu zostało wstrzymane, na przykład poprzez operację Sleep bądź Join względem innego wątku. Wątek w stanie zablokowania oddaje swoją część procesora do momentu wznowienia. Odblokować wątek można na cztery sposoby:
  • warunek wznowienia zostaje spełniony
  • przekroczony timeout (jeżeli został ustawiony)
  • przerwanie przez Thread.Interrupt
  • wykonanie instrukcji Thread.Abort
Podczas blokowania wątków można spotkać także tzw. spinning, technikę bardzo niefektywną dla CPU polegającą na ciągłym sprawdzaniu pewnego warunku. Nieco lepszym rozwiązaniem jest usypianie wątku na krótki czas. Technika ta jest efektywna jedynie dla krótkich blokowań, rzędu milisekund, gdyż wtedy nie zachodzi konieczność cennego przełączania kontekstu.

while (!proceed) Thread.Sleep (10);

Stan wątku jest dostępny w każdej chwili za pomocą property ThreadState. Możliwe stany wątków przedstawione są na diagramie poniżej.


Wątki zostają zablokowane na przykład, gdy użyjemy instrukcji lock. Instrukcja ta jest skrótem składniowym odpowiadającym instrukcjom Monitor.Enter oraz Monitor.Exit. Obiekt na którym wykonywana jest blokada musi dziedziczyć po typie referencyjnym.

static readonly object _locker = new object();
static int _x;

static void Increment() 
{ 
    lock (_locker) _x++;
}

static void Decrement()
{
    lock (_locker) _x--;
}

public static void Run()
{
    for (int i = 0; i < 5; i++)
    {
        Task.Factory.StartNew(Increment);
        Task.Factory.StartNew(Decrement);
    }
}

Lock jest instrukcją atomową, chyba że w jego wnętrzu zostanie rzucony wyjątek.Podobny efekt można uzyskać stosując Mutex. Konstrukcja ta mimo iż jest kilkadziesiąt razy wolniejsza, ma jedną ważną przewagę nad lockiem - można za jej pomocą wykonywać blokady pomiędzy procesami.

static void RunMutex()
{
    using (var mutex = new Mutex(false, "Sample Mutex String"))
    {
        if (!mutex.WaitOne(TimeSpan.FromSeconds(3), false))
        {
            Console.WriteLine("Another app instance is running. Bye!");
            return;
        }
        RunProgram();
    }
}

static void RunProgram()
{
    Console.WriteLine("Running. Press Enter to exit");
    Console.ReadLine();
}

Nazwa Mutex-a pozwoli mu blokować procesy w obrębie całego komputera. Jeżeli powyższy program zostanie uruchomiony w dwóch instancjach, to druga instancja poczeka 3 sekundy i zakończy swoją pracę, jeżeli kod pierwszej instancji nie wykona się (nie zostanie wciśnięty Enter).

Semafor

Semafor działa jak lock, z tą różnicą że jest w stanie obsłużyć więcej niż jeden wątek. Liczba wątków definiowana jest przez programistę, a każdy nadmiarowy wątek musi poczekać, aż któryś z poprzedników skończy swoje działanie.

static SemaphoreSlim _sem = new SemaphoreSlim(3);
public static void Run()
{
    for (int i = 1; i <= 5; i++) 
        new Thread(Enter).Start(i);
}

static void Enter(object id)
{
    Console.WriteLine(id + " wants to enter");
    _sem.Wait();
    Console.WriteLine(id + " is in!");
    Thread.Sleep(1000*(int) id);
    Console.WriteLine(id + " is leaving");
    _sem.Release();
}

SemaphoreSlim to klasa z .NET 4.0 zoptymalizowana pod kątem czasu wykonywania operacji Wait i Release.

sobota, 9 lutego 2013

[C#|Visual Studio] Threading: Thread Pool

Pula wątków rozwiązuje dwa dosyć poważne problemy związane ze współbieżnością w .NET:
  • każdy wątek domyślnie wykorzystuje około 1MB pamięci
  • utworzenie nowego wątku wraz z organizacją pamięci dla niego trwa setki milisekund
Dzięki puli wątków zyskujemy automatyczne współdzielenie i odzyskiwanie już pracujących wątków. Ponadto kontrolowana jest maksymalna ilość wątków typu "worker" jakie mogą pracować w danej chwili. Zbyt duże ilości aktywnych wątków mogą zaburzać pracę systemu operacyjnego, co staje się niewygodne dla użytkownika. Gdy zastosuje się Thread Pool, to w przypadku przekroczenia maksymalnej liczby wątków, nowe wątki są kolejkowane i uruchamiane dopiero w momencie, gdy któreś z aktywnych wątków skończą swoją pracę.

Przed rozpoczęciem pracy z pulą wątków należy pamiętać, że będą one zawsze wątkami typu "background".

Dostęp do puli wątków można uzyskać na kilka sposobów:

Pula wątków poprzez Task Parallel Library (TPL)

TPL dostarcza specjalną klasę Task, która dostępna jest pod dwoma postaciami: generyczną i niegeneryczną. Wątek z Thread Pool można wystartować w poniższy sposób:

public static void Run()
{
    Task.Factory.StartNew(NonGenericTask);
}

public static void NonGenericTask()
{
    Console.WriteLine("This is thread from thread pool");
}

lub w sposób generyczny, gdzie możemy zwracać wartość z wątku.

public static void Run()
{
    Task<string> downloader = Task.Factory.StartNew(
        () => GenericTask("http://www.wisla.krakow.pl"));
    Console.WriteLine("Some operations...");
    var result = downloader.Result;
    Console.WriteLine(result);
}

public static string GenericTask(string uri)
{
    Console.WriteLine("This is thread from thread pool");
    using (var wc = new System.Net.WebClient())
        return wc.DownloadString(uri);
}

Warto pamiętać, że w momencie, gdy odwołujemy się do property Result, wątek główny zostanie zawieszony do momentu wywołania wątku z puli.

TPL został wprowadzony w .NET 4.0. W starszych wersjach zachodzi konieczność korzystania z puli wątków w nieco inny sposób.

Pula wątków poprzez klasę ThreadPool

Tutaj mamy do dyspozycji klasę ThreadPool, gdzie wywołując wątek możemy przez parametr podać stan (jako typ object).

public static void Run()
{
    Console.WriteLine("Without TPL");
    ThreadPool.QueueUserWorkItem(QueueUserWorkItemFcn, 
        DateTime.Today.ToShortDateString());
}

public static void QueueUserWorkItemFcn(object dateTime)
{
    Console.WriteLine("This is thread from thread pool, at {0}", dateTime);
}

Jeżeli chcemy zwracać obiekty z wątku nie używając TPL, należy wykorzystać tzw. asynchronous delegates.

public static void Run()
{
    Console.WriteLine("Asynchronous delegate");
    Func<string, string> func = new Func<string, string>(AsynchronousDelegate);
    IAsyncResult value = func.BeginInvoke("this is some text", null, null);
    Console.WriteLine("Some operations...");
    var str = func.EndInvoke(value);
    Console.WriteLine(str);
}

public static string AsynchronousDelegate(string letters)
{
    Console.WriteLine("This is thread from thread pool");
    IEnumerable<char> rev = letters.Reverse();
    string result = "";
    foreach (var @char in rev)
    {
        result += @char;
    }
    return result;
}

Operacja EndInvoke czeka aż asynchroniczny delegat wykona swoje zadanie, zawieszając główny wątek, odbiera rezultat przetwarzania, a także "przerzuca" wyjątek z wątku w tle do wątku głównego.

Wywołując BeginInvoke można jako drugi parametr przekazać callback, który wykona się po zakończeniu pracy wątku. Callback jako parametr przyjmuje typ IAsyncResult.

public static void Run()
{
    var method = new Func<string, string>(AsynchronousDelegate);
    method.BeginInvoke("My text", Callback, method);
}

public static void Callback(IAsyncResult result)
{
    var target = (Func<string, string>)result.AsyncState;
    string res = target.EndInvoke(result);
    Console.WriteLine(res);
}

sobota, 2 lutego 2013

[C#|Visual Studio] Threading: Podstawy

W języku C#, niezależnie od tego, czy tworzymy aplikację konsolową, WinFormsową czy WPFową, domyślnie operacje zawsze będą się wykonywały w jednym wątku, tworzonym przez CLR. Wątkami w .NEcie zarządza specjalny mechanizm zwany thread scheduler. Na jednordzeniowej maszynie wielowątkowość realizowana jest poprzez podział czasu pracy procesora, natomiast na wielordzeniowej wątki uruchamiane są na wszystkich rdzeniach, jednak należy pamiętać, że system operacyjny czy też inne uruchomione programy również korzystają z CPU, tak więc podział czasu i przełączanie wątków w praktyce odbywa się zawsze. Czas przełączenia szacuje się na dziesiątku milisekund.

Najprostszym sposobem na zrównoleglenie wykonywanych operacji jest klasa Thread. Poniższy przykład pokazuje, jak przy jej użyciu na wieloprocesorowej maszynie wykonywać operacje naprzemiennie.

public static void Run()
{
    Thread t = new Thread(WriteY);          
    t.Start();
    for (int i = 0; i < 100; i++)
    {
        Thread.Sleep(i);
        Console.Write("x");
    } 
        
}

static void WriteY()
{
    for (int i = 0; i < 100; i++)
    {
        Thread.Sleep(i);
        Console.Write("y");
    }
}

Warto pamiętać, że dla każdego wątku tworzony jest osobny memory stack. Informacje pomiędzy wątkami mogą być przekazywane np. poprzez zmienne statyczne. Nieumiejętne ich wykorzystanie może jednak spowodować poważne problemy. Aby uniknąć np. sytuacji, w której dwa wątki zapisują w tym samym czasie dane do jednej zmiennej, należy użyć tzw. Exclusive Lock. Użycie słowa kluczowego lock powoduje, że tylko jeden wątek w danej chwili może korzystać z zasobu, drugi musi poczekać.

public class LockTest
{
    static bool done;
    static readonly object locker = new object();

    public static void Run()
    {
        new Thread(Go).Start();
        Go();
    }

    static void Go()
    {
        lock (locker)
        {
            if (!done) { Console.WriteLine("Done"); done = true; }
        }
    }
}

Blokowany wątek nie zużywa zasobów CPU.

Kolejną ważną operacją jest Join, służący do synchronizowania kolejności wykonywania operacji. Jeżeli z poziomu jednego wątku startujemy inny podrzędny, możemy zażądać aby dalsze wykonywanie kodu wstrzymać do zakończenia działania tego podrzędnego wątku.

public class JoinTest
{
    public static void Run()
    {
        Thread t = new Thread(Go);
        t.Start();
        for (int i = 0; i < 100; i++) Console.Write("x");
        t.Join();
        Console.WriteLine("Thread t has ended!");
    }

    static void Go()
    {
        for (int i = 0; i < 1000; i++) Console.Write("y");
    }
}

Wątki można nazywać, co przydaje się podczas debugowania w Visual Studio. Można im także nadawać priorytety. Nadanie priorytetu decyduje o tym, ile czasu procesor poświęci na dany wątek. Domyślny priorytet to Medium.

public class ThreadNamePriorityTest
{
    public static void Run()
    {
        Thread.CurrentThread.Name = "Main Thread";
        Thread worker = new Thread(Go);
        worker.Name = "Worker";
        worker.Priority = ThreadPriority.BelowNormal;
        worker.Start();
        Go();
    }

    static void Go()
    {
        Console.WriteLine("Hello from " + Thread.CurrentThread.Name);
    }
}