.NET: xxxSlim
◼
Obiekty pracujace w obrębie jednego procesu
◼
SemaphoreSlim (5x(?) mniejszy narzut)
◼
ReaderWriterLock zastąpiony przez
ReaderWriterLockSlim
Synchronizacja w trybie użytkownika
◼ Dużo mniej kosztowna niż wykorzystanie obiektów jądra
◼ Możliwe jest synchronizowanie tylko wątków tego samego procesu
◼ Funkcje atomowe:
• InterlockedIncrement,
• InterlockedDecrement,
• InterlockedExchancheAdd,
• InterlockedExchanche, (32bit)
• InterlockedExchanchePointer, (64bit)
• InterlockedCompareExchanche
◼ .NET - class Interlooked
• CompareExchange, Decrement, Exchange, Increment
Sekcja krytyczna
◼ Synchronizacja w trybie użytkownika
◼ Możliwe jest synchronizowanie tylko wątków tego samego procesu
◼ Uwaga wykonanie „wait” może spowodować – zmianę trybu
Typ obiektu: CRITICAL_SECTION Operacje na sekcji krytycznej:
• InitializeCriticalSection inicjalizacja sekcji krytycznej
• DeleteCriticalSection zwolnienie sekcji krytycznej
• EnterCriticalSection wejście do sekcji krytycznej
• LeaveCriticalSection wyjście z sekcji krytycznej
• TryEnterCriticalSection sprawdzenie sekcji krytycznej
◼ .NET: Monitor
Sekcja krytyczna – C# - lock
Kontekst pozwala identyfikować poszczególne instancje sekcji krytycznej
Aby uzyskać globalny kontekst można skorzystać z konstrukcji typeof np.
lock(typeof(myObject))
◼ synchronizowalne wrapery do zmiennych np.
Hashtable myHT = new Hashtable();
Hashtable mySyncHT = Hashtable.Synchronized(myHT);
System.Threading.Monitor.Enter(x);
try { ... } finally {
System.Threading.Monitor.Exit(x);
} lock (x) {
....
}
Dostęp do zmiennych
◼ Thread Local Storage
LocalDataStoreSlot lds =
Thread.GetNamedDataSlot("COOKIE");
Thread.SetData(lds, "abc");
var napis = (string) Thread.GetData(lds);
◼ Synchronizowalne wrapery do zmiennych np.
Hashtable myHT = new Hashtable();
Hashtable mySyncHT = Hashtable.Synchronized(myHT);
lock(mySyncHT.SyncRoot) {
foreach (Object item in mySyncHT) { // to do smthg }
Obiekty synchronizacyjne
Synchronizacja w trybie jądra jest dużo bardziej kosztowna
◼ Odmiany semaforów
Mutex – semafor binarny
Semaphore – semafo wielowartościowy
Event – zdarzenie
Waitable timer – budzik: NT, 2K, XP
powiadomienie o zmianie w systemie plików
◼ Obiekty, które mają własności semaforów:
proces, wątek - sygnalizowane po zakończeniu
zadanie - sygnalizowane po wyczerpaniu limitu czasu
plik - sygnalizowane gdy nie trwają operacje we/wy
wejście konsoli - sygnalizowane gdy są jakieś znaki w buf.
Nazwy obiektów synchronizacyjnych
◼
korzystanie z jednego obiektu przez różne procesy
◼
małe i duże litery są rozróżniane
◼
długość nieprzekraczająca _MAX_PATH
◼
znaki dozwolone takie jak dla nazwy pliku (bez \)
◼
obiekty synchronizacyjne dzielą tę samą przestrzeń nazw
◼
przestrzeń nazw dla obiektów
synchronizacyjnych jest rozłączna z przestrzenią
nazw systemu plików
Obiekt synchronizacyjny
◼ obiekt przyjmujący stany
signaled – semafor podniesiony
not signaled – semafor opuszczony
◼ operacje
zasygnalizowanie (podniesienie semafora - funkcje dedykowane dla typu obiektu
likwidacja stanu zasygnalizowanego
(opuszczenie semafora) – funkcje wspólne
dla wszystkich typów obiektów
Opuszczanie semafora
◼ opuszczenie pojedynczego semafora
DWORD WaitForSingleObject (HANDLE hObject, DWORD
dwMiliseconds) ; INFINITE
◼ opuszczanie zbioru semaforów
DWORD WaitForMultipleObjects (DWORD nCount, CONST HANDLE*
lpHandles, BOOL bWaitAll, DWORD dwMiliseconds)
◼ Oczekiwanie przerywane komunikatami
MsgWaitForMultipleObjects
◼ Oczekiwanie przerywane operacjami I/O
WaitForSingleObjectEx((HANDLE hObject, DWORD dwMiliseconds, BOOL bAlertable) ;
WaitForMultipleObjectsEx, MsgWaitForMultipleObjectsEx SignalObjectAndWait(HANDLE hObjectToSignal,
HANDLE hObjectToWait, DWORD dwMiliseconds, BOOL bAlertable) ;
Opuszczanie semafora .NET
class WaitHandle {
public virtual Boolean WaitOne();
public static Boolean WaitAll(WaitHandle[]);
public static Boolean WaitAny(WaitHandle[]);
…
public virtual Boolean WaitOne(int, bool);
public virtual Boolean WaitOne(TimeSpan, bool);
public virtual Boolean SignalAndWait (WaitHandle, WaitHandle) public virtual Boolean SignalAndWait (WaitHandle, WaitHandle, TimeSpan, Boolean);
public virtual IntPtr Handle { get; set; }
public SafeWaitHandle SafeWaitHandle { get; set; } };
Opuszczanie semafora .NET
class ManualResetEvent : WaitHandle;
class AutoResetEvent : WaitHandle;
class Mutex : WaitHandle;
class Semaphore : WaitHandle;
DeadLock
object l1 = new object();
object l2 = new object();
new Thread (() => { lock (l1) {
Thread.Sleep (1000);
lock (l2); // Deadlock }
}).Start();
lock (l2) {
Thread.Sleep (1000);
lock (l1); // Deadlock }
DeadLock - zapobieganie
◼ Zajmowanie zasobów w takiej samej kolejności
◼ Użycie WaitAll
Leniwa inicjalizacja
class Foo {
Expensive _expensive;
public Expensive expensive { // Lazily instantiate Expensive get {
if (_expensive == null) _expensive = new Expensive();
return _expensive;
} }
...
}
lock
Mutex
◼ Semafor binarny - obiekt dwustanowy
◼ Umożliwia synchronizację wątków dowolnych procesów
◼ Podniesienie semafora binarnego wznawia tylko jeden wątek
◼ Mutex jest własnością wątku:
wątek nie czeka na zajętym już przez siebie Mutexie, ale trzeba odpowiednią liczbę razy wołać ReleaseMutex)
podnieść Mutexa może tylko wątek, który go opuścił (dla innych wątków ReleaseMutex nie daje efektu)
zakończenie wątku będącego włąścicielem muteksa podnosi ten semafor -> wynik WaitFor... = WAIT_ABANDONED
◼ Operacje na semaforze binarnym
• CreateMutex (PSECURITY_ATRIBUTE psa, BOOL bInitialOwned, PCSTR pszName)
• OpenMutex
• ReleaseMutex podniesienie, opuszczanie semafora: WaitFor...
◼ .NET: Mutex
Semaphore
◼ Semafor wielowartościowy cechuje ograniczona liczba stanów (0 oznacza semafor opuszczony)
◼ Próba opuszczenia semafor zmniejsza licznik o 1
◼ Podniesienie semafora zwiększa licznik i wznawia tyle wątków, o ile został zmniejszony licznik
◼ Opuszczenie i podniesienie może być wykonane przez różne wątki
◼ operacje na semaforze wielowartościowym
• CreateSemaphore (PSECURITY_ATRIBUTE psa,
LONG lInitialCount, LONG lMaximumCount, PCSTR pszName)
• OpenSemaphore
• ReleaseSemaphore podniesienie
◼ .NET: Semaphore, SemaphoreSlim (>=.Net 4.0 – zalecany, pracuje hybrydowo tj. nie uzywa obj. jądra póki nie jest to
konieczne)
Event
◼ Zdarzenie - obiekt dwustanowy, służący do sygnalizowania zajścia wydarzenia
◼ Synchronizacja wątków dowolnych procesów
◼ Typy zdarzeń
manual-reset event - podniesienie wznawia wszystkie wątki
automatycznie opuszczane - podniesienie wznawia zawsze jeden wątek
◼ Operacje na zdarzeniu
• CreateEvent (PSECURITY_ATRIBUTE psa,
BOOL bManual, BOOL bInitial, PCSTR pszName)
• OpenEvent
• SetEvent - podniesienie
• ResetEvent - opuszczenie
• PulseEvent - podniesienie i opuszczenie - zwalnia wszystkie czekające wątki (manual) lub jeden(auto)
◼ .Net: AutoResetEvent, ManualResetEvent
AutoResetEventSlim, ManualResetEventSlim (.Net 4.5)
.NET: CountdownEvent
F O R K
J O I N
Master Thread
Parallel Region
Master Thread
CountdownEvent jest obiektem synchronizacyjnym który pozwala śledzić wykonanie wiekszej liczby zadań i
sygnalizować ich zakończenie.
CountdownEvent - Kod
public static void DoShoping(int id) { Thread.SpinWait(2000000);
Console.WriteLine("Customer {0} finished",id);
}
var syncEvent = new CountdownEvent(1);
foreach (int id in Enumerable.Range(1,20)) { int currentId = id;
syncEvent.AddCount();
ThreadPool.QueueUserWorkItem(delegate { DoShoping(currentId);
syncEvent.Signal(); });
}
syncEvent.Signal();
syncEvent.Wait();
Console.WriteLine("All customers finished Shopping");
.NET: Barrier
Mac
Charlie
Dennis
Gas Station = Barrier
Bost on
Barrier jest obiektem synchronizacyjnym, który pozwala na zatrzymanie wykonania większej liczby wątków w określonym punkcie dopóki nie zostanie on osiagnięty przez wszystkie
wątki
Barrier - kod
var delay = TimeSpan.FromSeconds(1);
var charlie = new Thread(() => DriveToSeattle("Charlie“, delay));
charlie.Start();
var mac = new Thread(() => DriveToSeattle("Mac", delay));
mac.Start();
var dennis = new Thread(() => DriveToSeattle("Dennis", delay));
dennis.Start();
charlie.Join();
mac.Join();
dennis.Join();
Barrier - kod
static void DriveToSeattle(string name, TimeSpan timeToGasStation) {
// Drive to gas station
Console.WriteLine("[{0}] Leaving House", name);
Thread.Sleep(timeToGasStation);
Console.WriteLine("[{0}] Arrived at Gas Station", name);
// Need to sync here
// Perform some more work
Console.WriteLine("[{0}] Leaving for Seattle", name);
}
Barrier - kod
static Barrier sync = new Barrier(3);
static void DriveToSeattle(string name, TimeSpan timeToGasStation) {
// Drive to gas station
Console.WriteLine("[{0}] Leaving House", name);
Thread.Sleep(timeToGasStation);
Console.WriteLine("[{0}] Arrived at Gas Station", name);
// Need to sync here
sync .SignalAndWait();
// Perform some more work
Console.WriteLine("[{0}] Leaving for Seattle", name);
}
.NET: ReaderWriterLockSlim
◼
Semafor dedykowany dla asymetrycznej sytuacji gdze istnieje możliwość wielu
odczytów vs. dostęp wyłączny
◼
operacje na semaforze
• IsReaderLockHeld, IsWriterLockHeld
• WriterSeqNum
• AcquireReaderLock, AcquireWriterLock
• AnyWritersSince
• UpgradeToWriterLock, DowngradeFromWriterLock
• ReleaseReaderLock, ReleaseWriterLock
• RestoreLock, ReleaseLock.
Powiadomienie o zmianie w systemie plików
◼ powiadomienie jest semaforem, który zmienia stan na
podniesiony w chwili wystąpienia zmiany w systemie plików
◼ zlecenie dokonania pierwszego powiadomienia HANDLE FindFirstChangeNotification(
LPTSTR lpszPath, // ścieżka
BOOL fWatchSubTree, // czy zmiana poddrzewa DWORD fdwFilter) ; // rodzaj zmiany
◼ FindNextChangeNotification - zlecenie dokonania kolejnego powiadomienia
◼ FindCloseChangeNotification - rezygnacja
◼ WaitFor... - oczekiwanie na powiadomienie
◼ .NET: System.IO.FileSystemWatcher
Inne metody synchronizacji
◼ WaitForInputIddle (HANDLE hProcess, DWORD wMillisecinds)
◼ .NET : System.Diagnostics.Process.WaitForInputIdle Czeka aż wątek główny przetworzy wszystkie
komunikaty (np. emulacja naciśnięć klawiszy)
WIELOZADANIOWOŚĆ W .NET
Koszt wątku:
◼
Pamięć:
Obiekt jądra (1.2 kB)
Thread environment block (4/8 kB 32/64b)
Stos (tryb użytkownika 1MB)
Stos (tryb jądra 12/24kB dla 32/64b)
◼
DllMain -> DLL_THREAD_ATTACH/
DLL_THREAD_DETACH
◼
Wniosek: wątki kosztują i nie należy ich
nadużywać
Wątki
Wątek CLR == wątek systemowy (jak dotąd) Start nowego wątku:
new Thread (() => { } ).Start();
new Thread (() => { } ).Start(startState);
Jaki będzie wynik dla:
for (int i = 0; i < 10; i++)
new Thread (() => Console.Write (i)).Start();
Wątki
Wątek CLR == wątek systemowy (jak dotąd) Start nowego wątku:
new Thread (() => { } ).Start();
new Thread (() => { } ).Start(startState);
Jaki będzie wynik dla:
for (int i = 0; i < 10; i++)
new Thread (() => Console.Write (i)).Start();
a dla
for (int i = 0; i < 10; i++) { int temp = i;
new Thread (() => Console.Write (temp)).Start();
}
Wątki
Kiedy używać:
długie zadania,
priorytet inny niż normal
zadania foreground
Pula wątków - .NET
◼
Obiekt systemowy: ThreadPool
◼
Uruchamianie dla void:
ThreadPool.QueueUserWorkItem (
notUsed => Console.WriteLine ("Msg from pool"));
◼
lub dla zwrotu (alternatywnie można zdef. f. callb.)
static int Work (string s) { return s.Length; }
…
Func<string, int> method = Work;
IAsyncResult cookie = method.BeginInvoke ("test", null, null);
…
int result = method.EndInvoke (cookie);
Console.WriteLine (“Result: " + result);
Pula wątków - .NET
◼
Kiedy używać:
krótkie zadania (<250ms, idealnie <100ms),
priorytet normal,
zadania background
ThreadPool.Set[Max|Min]Thread nowe wątki są
alokowane od Min do Max jeśli przez jakiś czas (0.5s) nie zmienia się kolejka zadań do wykonania.
◼
Używane przez : WCF, Remoting, ASP.NET, and ASMX Web Services, System.Timers.Timer i
System.Threading.Timer, BackgroundWorker
asynchr. delegaty
Wątki, pula
Pot. problemy:
Odebranie i przekazanie ew. wyjątków
Czekanie na wątek (synchronizacja) blokuje inny wątek
◼ Czekanie:
new Thread (() => Console.Write (“test”)).Start().Join();
◼ lub
var endOfWork = new CountdownEvent(10);
for (var i=1; i<=10; i++)
new Thread (() =>endOfWork.signal()).Start();
endOfWork.wait();
Czekanie inaczej
var threads = new list<Thread>();
for (var i=1; i<=10; i++) {
var t = new Thread (() =>endOfWork.signal());
t.Start();
threads.Add();
}
WaitHandle.WaitAny(threads, true); // false ?
Parallel Extensions .Net 4.x
◼
.NET Library
Mogą być wykorzystywane we wszystkich językach na platformie .NET
◼
Obejmuje 3 różne elementy
Parallel LINQ (PLINQ)
Task Parallel Library (TPL)
Coordination Data Structures (CDS)
Od watków do zadań (1)
static void Main() {
Tree tr = Tree.CreateSomeTree(9, 1);
Stopwatch sw = Stopwatch.StartNew();
WalkTree(tr);
Console.WriteLine("Elapsed= " + sw.ElapsedMilliseconds.ToString());
Console.ReadLine();
}
static void WalkTree(Tree tree) { if (tree == null) return;
WalkTree(tree.Left);
WalkTree(tree.Righ);
ProcessItem(tree.Data);
}
Od watków do zadań (2)
static void WalkTree(Tree tree) { if (tree == null) return;
Thread left = new Thread((o) => WalkTree(tree.Left));
left.Start();
Thread righ = new Thread((o) => WalkTree(tree.Righ));
righ.Start();
left.Join(); righ.Join();
ProcessItem(tree.Data);
}
Od watków do zadań (3)
static void WalkTree(Tree tree) {
if (tree == null) return;
Task left = new Task((o) => WalkTree(tree.Left));
left.Start();
Task righ = new Task((o) => WalkTree(tree.Righ));
righ.Start();
left.Wait();
righ.Wait();
ProcessItem(tree.Data);
}
Parallel LINQ-to-Objects
•
Wykorzystuje model "Task"•
Pozwala na wykorzystanie wielu rdzeni przy zapytaniach LINQ•
Wspiera w pełni standardowe operatory zapytań•
Minimalizuje wpływ na istniejące zapytania LINQvar q = from p in people
where p.Name == queryInfo.Name &&
p.State == queryInfo.State &&
p.Year >= yearStart &&
p.Year <= yearEnd orderby p.Year ascending select p;
.AsParallel()
(PLINQ)
PLINQ
IEnumerable<int> numbers =
Enumerable.Range (3, 100000-3);
var parallelQuery =
from n in numbers.AsParallel()
where Enumerable.Range (2, (int) Math.Sqrt (n)) .All (i => n % i > 0)
select n;
int[] primes = parallelQuery.ToArray();
◼ Dla operatorów akceptujących 2 sekwencje (Join, GroupJoin, Concat, Union, Intersect, Except, Zip) AsParalel musi być zaaplikowany do obu sekwencji
PLINQ
◼ Warto stosować:
Dla relatywnie prostych przypadków
W przypadku bardziej skomplikowanych rozwiazań to podejście może być mniej czytelne
Parallel Static Class
•
Gdy instrukcje są niezależne moga być zrównolegloneStatementA();
StatementB();
StatementC();
Parallel.Invoke(
() => StatementA(), () => StatementB(), () => StatementC() );
Parallel.For - Przykład
void NonParalelMethod() { for (int i=0; i<16; i++) {
Console.WriteLine(“TID={0}, i={1}”,
Thread.CurrentThread.ManagedThreadId, i);
SimulateProcessing(i);
} }
void ParalelMethod() {
Paralel.For (0, 16, i => {
Console.WriteLine(“TID={0}, i={1}”,
Thread.CurrentThread.ManagedThreadId, i);
SimulateProcessing(i);
} }
Paralel Invoke
public static void Invoke (params Action[] actions);
Parallel.Invoke (
() => new WebClient().DownloadFile ("http://www.wp.pl", “index.html"), () => new WebClient().DownloadFile
("http://www.pg.gda.pl", “index.html") );
public static void Invoke (ParallelOptions options, params Action[] actions);
Np. Cancelation token
Paralel Enumerable
public static void Invoke (params Action[] actions);
val w = ParallelEnumerable.Range (1, 10000000).Sum ( i => Math.Sqrt (i))
public static void Invoke (ParallelOptions options, params Action[] actions);
System.Collections.Concurrent
•
Przy przetwarzaniu wielozadaniowym należy wykorzystywać klasy kolekcji bezpiecznych ("thread-safe")•
ConcurrentStack<T>•
ConcurrentQueue<T>•
ConcurrentLinkedList<T>•
ConcurrentDictionary<TKey,TValue>•
ConcurrentBag<TKey,TValue>•
BlockingCollection<T>•
IProducerConsumerCollection<T>•
Partitioner, Partitioner<T>, OrderablePartitioner<T>•
zamiast•
System.Collections•
System.Collections.GenericTask Scheduler
Global Queue
Local
Queue Local
Queue
Worker Thread
1 Worker
Thread p Program
Thread Task 1
Task 2 Task 3
Task 5 Task 4
Task - .NET 4.X
◼
Zeby zadanie miało sens – jego długość powinna być > 200-300 cykli procesora
◼
Domyslnie scheduler używa globalnej puli (można to zmienić)
◼
Aby zapewnić np. priorytety lub niekorzystanie z .Net puli wątków konieczne jest
zaimplementowanie własnego schedulera.
Task - .NET 4.X
◼
Uruchamianie:
Task task = Task.Run (() => Console.WriteLine (“Task")) .Start();
Task<int> taskInt = Task.Run (() => { return 3; });
Task task = Task.Factory.StartNew ( () => {}, options);
◼
Czekanie:
task.Wait();
lub
int ret = taskInt.Result;
Task - .NET 4.X
◼
Długie zadania (aby nie blokować puli):
Task task = Task.Factory.StartNew (
() => …, TaskCreationOptions.LongRunning);
Opcje:
◼ LongRunning
◼ PreferFairness – probuje utrzymac kolejnosc wykonania odpowiadajaca kolejnosci tworzenia
◼ AttachedToParent – task potomny
◼ DenyChildAttach – rzuca wyjatek przy probie definicji potomnego task-u
◼ HideScheduler – używa domyslnego schedulera dla StartNew, ContinueWith
Task vs. wyjątki
◼ Nieobsłużone wyjątki są przechowywane (i wyrzucane)
w momencie odczytu zwrotu
lub wykonania Wait() na zadaniu
W 4.0 były wyrzucane przy finalizatorze (crash aplikacji)
◼ CLR opakowuje wyjątki w AggregateException Task task = Task.Run (
() => { throw new ApplicationException(“Problem”); });
try { task.Wait(); }
catch (AggregateException aex)
{ Console.WriteLine (aex.InnerException.ToString() ); }
◼ Stan zadania można sprawdzić przez IsFaulted i IsCanceled
◼ Sam wyjątek jest dostępny przez właściwość Exception
Task - kontynuacje
Task.Factory.StartNew<int> (() => 8)
.ContinueWith (ant => ant.Result * 2)
.ContinueWith (ant => Math.Sqr (ant.Result))
.ContinueWith (ant => Console.WriteLine (ant.Result));
Wynik: 256
Task - kontynuacje
var basicTask=new Task<int> (()=> {return 5;} );
basicTask.ContinueWith (antecedent =>
{
int result = antecedent.Result;
Console.WriteLine (result); // Writes 5 });
basicTask.Start();
◼ Brak konieczności synchronizacji
Task - kontynuacje
◼ Zwykle kolejne zadania wykonuje ten sam wątek (dla GUI zawsze powinien?),
◼ Można zdefiniować kilka kontynuacji dla jednego zadania.
◼ jesli chcemy mieć pewność: ContinueWith (antecedent =>{…}, TaskContinuationOptions.ExecuteSynchronously);
◼ Domyślnie kontynuacje zostaną skolejkowane, i przy wielu będą pot.
uruchomione jednoczesnie. ExecuteSynchronously je szerguje.
◼ Inne opcje: NotOnCanceled, OnlyOnCanceled, LazyCancellation, LongRunning, NotOnFaulted,
Task - zagnieżdzony
var parent = Task.Factory.StartNew(() => { Console.WriteLine("Outer task executing.");
var child = Task.Factory.StartNew(() => { Console.WriteLine("Nested task starting.");
Thread.SpinWait(500000);
Console.WriteLine("Nested task completing.");
});
});
parent.Wait();
Task - potomny
var parent = Task.Factory.StartNew(() => {
Console.WriteLine("Outer, parent task executing.");
var child = Task.Factory.StartNew(() => { Console.WriteLine("Nested task starting.");
Thread.SpinWait(500000);
Console.WriteLine("Nested, child task completing.");
}, TaskContinuationOptions. AttachedToParent);
});
parent.Wait();
T. – zagnieżdzony vs. potomny
◼ Task.Run – blokuje AttachedToParent tak jak opcja DenyChildAttach
◼ Task.Factory.StartNew – uwzględnia AttachedToParent
Nested AttachedToParent
Rodzic nie czeka na zakończenie
Rodzic czeka, tj. nie kończy wykonania przed dziecmi (ale nie robi join!!!)
Stan rodzica nie zależy od stanu tasku
Stan rodzica zależy od stanu tasku
Rodzic nie przechwytuje wyjątków
Rodzic przechwytuje wyjątki
Task – czekaj na wszystkie
var taskQueue = new Queue<Task>();
for (int i = 0; i < 10; i++) {
taskQueue.Enqueue(Task.Factory.StartNew(
() => { /* Do work. */ }));
}
// Perform some work with the tasks when they complete.
Task.Factory.ContinueWhenAll(taskQueue.ToArray(), completedTasks => { // Do continuation work.}
);
Task – czekaj na pierwszy
var taskQueue = new Queue<Task<int>>();
for (int i = 0; i < 10; i++) {
taskQueue.Enqueue(Task<int>.Factory.StartNew(() => { /*Do work.*/ }));
}
// Perform some work with the tasks when they complete.
Task.Factory.ContinueWhenAny(taskQueue.ToArray(), completedTask => {
Console.WriteLine(completedTask.Result); } );
Realizacja przetwórz po kolei
var taskQueue = new Queue<Task<int>>();
for (int i = 0; i < 10; i++) {
taskQueue.Enqueue(Task<int>.Factory.StartNew(() =>
{ /*Do work.*/ }));
}
// Wykonaj zbieranie gotowych wyników . while (! taskQueue.IsEmpty)
Task<int>.Factory.ContinueWhenAny(taskQueue.ToArray(), completedTask => {
Console.WriteLine(completedTask.Result);
taskQueue.Remove(completedTask);
} );
Anulowanie przetwarzania
Tradycyjne podejście:
◼
zabicie watków – Problemy:
Wykonanie akcji porządkujących
Jak zabijamy wątki z puli to…?
Co z anuowaniem jeśli są zadania (potomne, zagnieżdżone)
◼
Ustawienie flagi zakończ przetwarzanie Problemy:
Wątek może czekać na obiekcie synchr.
Unified cancelation Model
◼ token.ThrowIfCancellationRequested();
◼ token.IsCancellationRequested
CancellationTokenSource
OP2 OP3
OP1
Cancel()
Barrier - kod
static Barrier sync = new Barrier(3);
static CancellationToken token;
static void DriveToSeattle(string name, TimeSpan timeToGasStation) {
try { // Drive to gas station
Console.WriteLine("[{0}] Leaving House", name);
Thread.Sleep(timeToGasStation);
Console.WriteLine("[{0}] Arrived at Gas Station", name);
sync.SignalAndWait(token);
Console.WriteLine("[{0}] Leaving for Seattle", name);
catch(OperationCancelledException) {
Console.WriteLine("[{0}] Caravan was canceled", name);
} }
Barrier - kod
var source = new CancellationTokenSource();
token = source.Token;
var delay = TimeSpan.FromSeconds(1);
var charlie = new Thread(() => DriveToSeattle("Charlie“, delay));
charlie.Start();
var mac = new Thread(() => DriveToSeattle("Mac", delay));
mac.Start();
var dennis = new Thread(() => DriveToSeattle("Dennis", delay));
dennis.Start();
source.Cancel();
charlie.Join();
mac.Join();
dennis.Join();
Barrier - kod
static Barrier sync = new Barrier(3);
static CancellationToken token;
static void DriveToSeattle(string name, TimeSpan timeToGasStation) {
try { // Drive to gas station
Console.WriteLine("[{0}] Leaving House", name);
Thread.Sleep(timeToGasStation);
Console.WriteLine("[{0}] Arrived at Gas Station", name);
sync .SignalAndWait();
Console.WriteLine("[{0}] Leaving for Seattle", name);
catch(OperationCancelledException) {
Console.WriteLine("[{0}] Caravan was canceled", name);
} }
Granice zrównoleglania
◼ Zadania mają sens jeśli są krótkie (nie za krótkie ☺).
◼ 100 dlugich zadan – prowadzi powoli do 100 watków w puli … a to jest nieefektywne
◼ 100 dlugich zadan z opcja longrunning – prowadzi szybko do 100 wątków… poza pulą
◼ Optymalizacja:
◼ wystartowanie tylu zadań ile jest rdzeni i w WaitAny dorzucanie nowych zadań.
◼ Użycie Parallel loop/for/foreach - w opcjach można określić ogranicznie paralelizmu:
ParallelOptions options = new ParallelOptions();
options.MaxDegreeOfParallelism =
System.Environment.ProcessorCount;
Parallel.For (sourceCollection, options, item => Process(item));
Task – kontynuacja po nowemu
var basicTask = new Task<int> (()=> {return 15;} );
var awaiter = basicTask.GetAwaiter();
awaiter.OnCompleted (() =>
{
int result = awaiter. GetResult();
Console.WriteLine (result); // Writes 123 });
basicTask.Start();
Task – completition source
public class TaskCompletionSource<TResult> {
public void SetResult (TResult result);
public void SetException (Exception exception);
public void SetCanceled();
public bool TrySetResult (TResult result);
public bool TrySetException (Exception exception);
public bool TrySetCanceled();
}
var tcs = new TaskCompletionSource<int>();
new Thread (() => { int ret = DoSmthg(); tcs.SetResult (ret); }) .Start();
Task<int> task = tcs.Task; // dedykowany, task oczekujacy Console.WriteLine (task.Result); // poczeka i wypisze ret
wołana jedna z
completition source
Może być wykorzystane do czekania np. na I/O
Np.:
var timer = new System.Timers.Timer (1000) { AutoReset = false };
timer.Elapsed += delegate
{ timer.Dispose(); tcs.SetResult (10); };
Asynchroniczny delay – DoAfter
Task DoAfter (int msToDelay){
var tcs = new TaskCompletionSource<object>();
var timer = new System.Timers.Timer (msToDelay) { AutoReset = false };
timer.Elapsed += delegate
{ timer.Dispose(); tcs.SetResult (null); };
timer.Start();
return tcs.Task;
}
DoAfter(1000).GetAwaiter()
.OnCompleted (() => DoSmthgAfter1s ());
//lub
DoAfter(1000). ContinueWith() (ant => DoSmthgAfter1s ());
Operacje asynchroniczne
◼ Podejście wielowątkowe: Tworzymy kod jako synchroniczny i wywołujemy go w oddzielnym wątku
◼ Podejście asynchroniczne: Funkcja może działać jeszcze po zwróceniu sterowania. Dopóki nie próbujemy uzyskać
wyniku od operacji działającej istotnie długo – nie ma to wpływu na wątek, który ja wywołał
◼ Przykładem są np. ContinueWith/OnContinue 2 typowe scenariusze:
◼ Po stronie serwerowej duza ilość operacji IO
◼ W aplikacji klienckiej uproszenie złożonej logiki wielowątkowej synchronizacji
Operacje asynchroniczne
◼ Model zalecany dla np. dla metro, SL, wydajnego serwera wielowątkowego (I/O bounded)
◼ Nie startujemy nowych wątków ex-plicite
◼ długie operacje (>=50ms) uruchamiamy jako asynchroniczne
◼ Krótkie operacje robimy w wątku GUI
◼ Zasadniczo można nie robić synchroniazacji dostepu do zasobów
Operacje asynchroniczne APM
◼ „Zrób swoje” z użyciem CPU, a potem wyjdź i pozwól wywołać Call-back
IAsyncResult BeginXXX (args,
AsyncCallback callback, object state);
public delegate void AsyncCallback (IAsyncResult ar);
return-type EndXXX (IAsyncResult ar);
◼ N.p.: dla stream:
public IAsyncResult BeginRead (byte[] buffer, int offset, int size, AsyncCallback callback, object state);
public int EndRead (IAsyncResult asyncResult);
IAsyncResult
public interface IAsyncResult {
// "state" object passed to Begin.
object AsyncState { get; } // Signaled when complete.
WaitHandle AsyncWaitHandle { get; }
// Did it complete on BeginX (quick, APN-not supported, //CPU-bond so completed wthout blocking?
bool CompletedSynchronously { get; }
// Has it completed yet?
bool IsCompleted { get; } }
TPL vs Async
TaskFactory.FromAsync(
◼ BeginXXX method
◼ EndXXX method
◼ Dodatkowe parametry
Task<int> readChunk = Task<int>.Factory.FromAsync ( stream.BeginRead,
stream.EndRead, buffer, 0, 1000, null);
Przykład wielowątkowy
int GetPrimesCount (int start, int count) {
return ParallelEnumerable.Range (start, count).Count ( n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (
i => n % i > 0));
}
void DisplayPrimeCounts () { const int RANGE = 1000000;
for (int i = 0; i < 10; i++)
Console.WriteLine (" Znaleziono: " +
GetPrimesCount (i*RANGE + 2, RANGE) +
" liczb pierwszych między " + (i * RANGE) +
" oraz " + ((i+1) * RANGE -1));
}
P. asynchroniczny “naiwny”
Task<int> GetPrimesCountAsync (int start, int count) { return Task.Run (
() => ParallelEnumerable.Range (start, count).Count ( n =>Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (
i => n % i > 0)));
}
void DisplayPrimeCounts() {
const int RANGE = 1000000;
for (int ii = 0; ii < 10; ii++) {// kolejnosc ???
var i = ii;
var awaiter = GetPrimesCountAsync (
i* RANGE + 2, RANGE).GetAwaiter();
awaiter.OnCompleted (() => Console.WriteLine (
“Znaleziono:” + awaiter.GetResult() + “... "));
} } Kolejność... + Koniec zanim liczby zostaną wypisane
Przykład asynchroniczny
void DisplayPrimeCountsFrom (int i, int count ){
const int RANGE = 1000000;
var awaiter = GetPrimesCountAsync (i * RANGE + 2, RANGE).GetAwaiter();
awaiter.OnCompleted (() => {
Console.WriteLine (“Znaleziono:”, awaiter.GetResult()+ "…");
if (count>0) DisplayPrimeCountsFrom (i,count-1);
else Console.WriteLine ("Done");
});
}
void DisplayPrimeCounts() {
DisplayPrimeCountsFrom (0, 10);
}
◼ DisplayPrimeCounts sama nie jest asynchroniczna
Przykład asynchroniczny II
class PrimesStateMachine { readonly int MaxCount = 10;
const int RANGE = 1000000;
TaskCompletionSource<object> _tcs =
new TaskCompletionSource<object>();
public Task Task { get { return _tcs.Task; } } public void DisplayPrimeCountsFrom (int i) {
var awaiter = GetPrimesCountAsync (
i* RANGE +2, RANGE).GetAwaiter();
awaiter.OnCompleted (() => {
Console.WriteLine (awaiter.GetResult());
if (i++ < MaxCount) DisplayPrimeCountsFrom (i);
else { Console.WriteLine ("Done"); _tcs.SetResult (null); } });
} }
Przykład asynchroniczny II cd.
Task DisplayPrimeCountsAsync() {
var machine = new PrimesStateMachine();
machine.DisplayPrimeCountsFrom(0);
return machine.Task;
}
C# >= 5.0
◼ Kod:
var result = await wyrażenie;
Instukcja(je);
◼ Jest rozwijany do:
var awaiter = wyrażenie.GetAwaiter();
awaiter.OnCompleted (() =>
{
var result = awaiter.GetResult();
Instukcja(je);
);
Przykład asynchroniczny
Task<int> GetPrimesCountAsync (int start, int count) { return Task.Run (
() => ParallelEnumerable.Range (start, count).Count (
n =>Enumerable.Range (2, (int)Math.Sqrt(n)-1).All ( i => n % i > 0)));
}
async void DisplayPrimeCounts() {
const int RANGE = 1000000;
for (int i = 0; i < 10; i++) Console.WriteLine (
await GetPrimesCountAsync (i* RANGE +2, RANGE));
}
…
async void DisplayPrimesCount() {
int result = await GetPrimesCountAsync (2, 1000000);
Console.WriteLine (result);
}
Odpowiada funkcjonalnie następującemu kodowi void DisplayPrimesCount() {
var awaiter = GetPrimesCountAsync (2, 1000000).GetAwaiter();
awaiter.OnCompleted (() => { int result = awaiter.GetResult();
Console.WriteLine (result);
});
}
C# >= 5.0 async
◼ Może być zaaplikowany do metod zwracających
void
Task
Task<T>
◼ Nie zmienia sygnatury (podobnie jak unsafe)
◼ W ciele metody napotkanie await zwraca sterownie (podobnie jak yield)
◼ Async może być dodany do labdy i metod anonimowych
C# >=5.0 await
◼ Typowo wywoływany jest na Task-u
◼ Wystarczy aby przedmiot wołania await miał
Metodę GetAwaiter która zwróci obiekt implementujący
INotifyCompletion.OnCompleted (tj. GetResult zwracające odpowiedni typ i właściwość IsCompleted)
◼ Może wystąpic w metodach asynchronicznych praktycznie wszędzie z wyjątkiem:
catch / finally
Wyrażenia lock,
Kontekstu unsafe
Punktu wejścia do aplikacji (main method).
Równoległość ponownie
Wołanie metod async bez await powoduje ich równoległe (thread pool) wykonanie.
async Task DoSmthg1() {
… }
async Task DoSmthg2() {
… }
var task1 = DoSmthg1();
var task2 = DoSmthg2();
Konstrukcja bogatego GUI
Etapy:
◼ Piszemy metody synchronicznie
◼ Zamieniamy synchroniczne wołania na asynchroniczne i wykonujemy na nich await
◼ Z wyjątkiem głównych metod (obsługa zdarzeń w GUI)
zamieniamy typy zwracane na Task lub Task<TResult> tak by można było na nich czekać