• Nie Znaleziono Wyników

.NET: xxxslim Obiekty pracujace w obrębie jednego procesu SemaphoreSlim (5x(?) mniejszy narzut) ReaderWriterLock zastąpiony przez ReaderWriterLockSlim

N/A
N/A
Protected

Academic year: 2022

Share ".NET: xxxslim Obiekty pracujace w obrębie jednego procesu SemaphoreSlim (5x(?) mniejszy narzut) ReaderWriterLock zastąpiony przez ReaderWriterLockSlim"

Copied!
88
0
0

Pełen tekst

(1)

.NET: xxxSlim

Obiekty pracujace w obrębie jednego procesu

SemaphoreSlim (5x(?) mniejszy narzut)

ReaderWriterLock zastąpiony przez

ReaderWriterLockSlim

(2)

Synchronizacja w trybie użytkownika

Dużo mniej kosztowna niż wykorzystanie obiektów jądra

Możliwe jest synchronizowanie tylko wątków tego samego procesu

Funkcje atomowe:

InterlockedIncrement,

InterlockedDecrement,

InterlockedExchancheAdd,

InterlockedExchanche, (32bit)

InterlockedExchanchePointer, (64bit)

InterlockedCompareExchanche

.NET - class Interlooked

CompareExchange, Decrement, Exchange, Increment

(3)

Sekcja krytyczna

Synchronizacja w trybie użytkownika

Możliwe jest synchronizowanie tylko wątków tego samego procesu

Uwaga wykonanie „wait” może spowodować – zmianę trybu

Typ obiektu: CRITICAL_SECTION Operacje na sekcji krytycznej:

InitializeCriticalSection inicjalizacja sekcji krytycznej

DeleteCriticalSection zwolnienie sekcji krytycznej

EnterCriticalSection wejście do sekcji krytycznej

LeaveCriticalSection wyjście z sekcji krytycznej

TryEnterCriticalSection sprawdzenie sekcji krytycznej

.NET: Monitor

(4)

Sekcja krytyczna – C# - lock

Kontekst pozwala identyfikować poszczególne instancje sekcji krytycznej

Aby uzyskać globalny kontekst można skorzystać z konstrukcji typeof np.

lock(typeof(myObject))

synchronizowalne wrapery do zmiennych np.

Hashtable myHT = new Hashtable();

Hashtable mySyncHT = Hashtable.Synchronized(myHT);

System.Threading.Monitor.Enter(x);

try { ... } finally {

System.Threading.Monitor.Exit(x);

} lock (x) {

....

}

(5)

Dostęp do zmiennych

Thread Local Storage

LocalDataStoreSlot lds =

Thread.GetNamedDataSlot("COOKIE");

Thread.SetData(lds, "abc");

var napis = (string) Thread.GetData(lds);

Synchronizowalne wrapery do zmiennych np.

Hashtable myHT = new Hashtable();

Hashtable mySyncHT = Hashtable.Synchronized(myHT);

lock(mySyncHT.SyncRoot) {

foreach (Object item in mySyncHT) { // to do smthg }

(6)

Obiekty synchronizacyjne

Synchronizacja w trybie jądra jest dużo bardziej kosztowna

Odmiany semaforów

Mutex – semafor binarny

Semaphore – semafo wielowartościowy

Event – zdarzenie

Waitable timer – budzik: NT, 2K, XP

powiadomienie o zmianie w systemie plików

Obiekty, które mają własności semaforów:

proces, wątek - sygnalizowane po zakończeniu

zadanie - sygnalizowane po wyczerpaniu limitu czasu

plik - sygnalizowane gdy nie trwają operacje we/wy

wejście konsoli - sygnalizowane gdy są jakieś znaki w buf.

(7)

Nazwy obiektów synchronizacyjnych

korzystanie z jednego obiektu przez różne procesy

małe i duże litery są rozróżniane

długość nieprzekraczająca _MAX_PATH

znaki dozwolone takie jak dla nazwy pliku (bez \)

obiekty synchronizacyjne dzielą tę samą przestrzeń nazw

przestrzeń nazw dla obiektów

synchronizacyjnych jest rozłączna z przestrzenią

nazw systemu plików

(8)

Obiekt synchronizacyjny

obiekt przyjmujący stany

signaled – semafor podniesiony

not signaled – semafor opuszczony

operacje

zasygnalizowanie (podniesienie semafora - funkcje dedykowane dla typu obiektu

likwidacja stanu zasygnalizowanego

(opuszczenie semafora) – funkcje wspólne

dla wszystkich typów obiektów

(9)

Opuszczanie semafora

opuszczenie pojedynczego semafora

DWORD WaitForSingleObject (HANDLE hObject, DWORD

dwMiliseconds) ; INFINITE

opuszczanie zbioru semaforów

DWORD WaitForMultipleObjects (DWORD nCount, CONST HANDLE*

lpHandles, BOOL bWaitAll, DWORD dwMiliseconds)

Oczekiwanie przerywane komunikatami

MsgWaitForMultipleObjects

Oczekiwanie przerywane operacjami I/O

WaitForSingleObjectEx((HANDLE hObject, DWORD dwMiliseconds, BOOL bAlertable) ;

WaitForMultipleObjectsEx, MsgWaitForMultipleObjectsEx SignalObjectAndWait(HANDLE hObjectToSignal,

HANDLE hObjectToWait, DWORD dwMiliseconds, BOOL bAlertable) ;

(10)

Opuszczanie semafora .NET

class WaitHandle {

public virtual Boolean WaitOne();

public static Boolean WaitAll(WaitHandle[]);

public static Boolean WaitAny(WaitHandle[]);

public virtual Boolean WaitOne(int, bool);

public virtual Boolean WaitOne(TimeSpan, bool);

public virtual Boolean SignalAndWait (WaitHandle, WaitHandle) public virtual Boolean SignalAndWait (WaitHandle, WaitHandle, TimeSpan, Boolean);

public virtual IntPtr Handle { get; set; }

public SafeWaitHandle SafeWaitHandle { get; set; } };

(11)

Opuszczanie semafora .NET

class ManualResetEvent : WaitHandle;

class AutoResetEvent : WaitHandle;

class Mutex : WaitHandle;

class Semaphore : WaitHandle;

(12)

DeadLock

object l1 = new object();

object l2 = new object();

new Thread (() => { lock (l1) {

Thread.Sleep (1000);

lock (l2); // Deadlock }

}).Start();

lock (l2) {

Thread.Sleep (1000);

lock (l1); // Deadlock }

(13)

DeadLock - zapobieganie

Zajmowanie zasobów w takiej samej kolejności

Użycie WaitAll

(14)

Leniwa inicjalizacja

class Foo {

Expensive _expensive;

public Expensive expensive { // Lazily instantiate Expensive get {

if (_expensive == null) _expensive = new Expensive();

return _expensive;

} }

...

}

lock

(15)

Mutex

Semafor binarny - obiekt dwustanowy

Umożliwia synchronizację wątków dowolnych procesów

Podniesienie semafora binarnego wznawia tylko jeden wątek

Mutex jest własnością wątku:

wątek nie czeka na zajętym już przez siebie Mutexie, ale trzeba odpowiednią liczbę razy wołać ReleaseMutex)

podnieść Mutexa może tylko wątek, który go opuścił (dla innych wątków ReleaseMutex nie daje efektu)

zakończenie wątku będącego włąścicielem muteksa podnosi ten semafor -> wynik WaitFor... = WAIT_ABANDONED

Operacje na semaforze binarnym

CreateMutex (PSECURITY_ATRIBUTE psa, BOOL bInitialOwned, PCSTR pszName)

OpenMutex

ReleaseMutex podniesienie, opuszczanie semafora: WaitFor...

.NET: Mutex

(16)

Semaphore

Semafor wielowartościowy cechuje ograniczona liczba stanów (0 oznacza semafor opuszczony)

Próba opuszczenia semafor zmniejsza licznik o 1

Podniesienie semafora zwiększa licznik i wznawia tyle wątków, o ile został zmniejszony licznik

Opuszczenie i podniesienie może być wykonane przez różne wątki

operacje na semaforze wielowartościowym

CreateSemaphore (PSECURITY_ATRIBUTE psa,

LONG lInitialCount, LONG lMaximumCount, PCSTR pszName)

OpenSemaphore

ReleaseSemaphore podniesienie

.NET: Semaphore, SemaphoreSlim (>=.Net 4.0 – zalecany, pracuje hybrydowo tj. nie uzywa obj. jądra póki nie jest to

konieczne)

(17)

Event

Zdarzenie - obiekt dwustanowy, służący do sygnalizowania zajścia wydarzenia

Synchronizacja wątków dowolnych procesów

Typy zdarzeń

manual-reset event - podniesienie wznawia wszystkie wątki

automatycznie opuszczane - podniesienie wznawia zawsze jeden wątek

Operacje na zdarzeniu

CreateEvent (PSECURITY_ATRIBUTE psa,

BOOL bManual, BOOL bInitial, PCSTR pszName)

OpenEvent

SetEvent - podniesienie

ResetEvent - opuszczenie

PulseEvent - podniesienie i opuszczenie - zwalnia wszystkie czekające wątki (manual) lub jeden(auto)

.Net: AutoResetEvent, ManualResetEvent

AutoResetEventSlim, ManualResetEventSlim (.Net 4.5)

(18)

.NET: CountdownEvent

F O R K

J O I N

Master Thread

Parallel Region

Master Thread

CountdownEvent jest obiektem synchronizacyjnym który pozwala śledzić wykonanie wiekszej liczby zadań i

sygnalizować ich zakończenie.

(19)

CountdownEvent - Kod

public static void DoShoping(int id) { Thread.SpinWait(2000000);

Console.WriteLine("Customer {0} finished",id);

}

var syncEvent = new CountdownEvent(1);

foreach (int id in Enumerable.Range(1,20)) { int currentId = id;

syncEvent.AddCount();

ThreadPool.QueueUserWorkItem(delegate { DoShoping(currentId);

syncEvent.Signal(); });

}

syncEvent.Signal();

syncEvent.Wait();

Console.WriteLine("All customers finished Shopping");

(20)

.NET: Barrier

Mac

Charlie

Dennis

Gas Station = Barrier

Bost on

Barrier jest obiektem synchronizacyjnym, który pozwala na zatrzymanie wykonania większej liczby wątków w określonym punkcie dopóki nie zostanie on osiagnięty przez wszystkie

wątki

(21)

Barrier - kod

var delay = TimeSpan.FromSeconds(1);

var charlie = new Thread(() => DriveToSeattle("Charlie“, delay));

charlie.Start();

var mac = new Thread(() => DriveToSeattle("Mac", delay));

mac.Start();

var dennis = new Thread(() => DriveToSeattle("Dennis", delay));

dennis.Start();

charlie.Join();

mac.Join();

dennis.Join();

(22)

Barrier - kod

static void DriveToSeattle(string name, TimeSpan timeToGasStation) {

// Drive to gas station

Console.WriteLine("[{0}] Leaving House", name);

Thread.Sleep(timeToGasStation);

Console.WriteLine("[{0}] Arrived at Gas Station", name);

// Need to sync here

// Perform some more work

Console.WriteLine("[{0}] Leaving for Seattle", name);

}

(23)

Barrier - kod

static Barrier sync = new Barrier(3);

static void DriveToSeattle(string name, TimeSpan timeToGasStation) {

// Drive to gas station

Console.WriteLine("[{0}] Leaving House", name);

Thread.Sleep(timeToGasStation);

Console.WriteLine("[{0}] Arrived at Gas Station", name);

// Need to sync here

sync .SignalAndWait();

// Perform some more work

Console.WriteLine("[{0}] Leaving for Seattle", name);

}

(24)

.NET: ReaderWriterLockSlim

Semafor dedykowany dla asymetrycznej sytuacji gdze istnieje możliwość wielu

odczytów vs. dostęp wyłączny

operacje na semaforze

IsReaderLockHeld, IsWriterLockHeld

WriterSeqNum

AcquireReaderLock, AcquireWriterLock

AnyWritersSince

UpgradeToWriterLock, DowngradeFromWriterLock

ReleaseReaderLock, ReleaseWriterLock

RestoreLock, ReleaseLock.

(25)

Powiadomienie o zmianie w systemie plików

powiadomienie jest semaforem, który zmienia stan na

podniesiony w chwili wystąpienia zmiany w systemie plików

zlecenie dokonania pierwszego powiadomienia HANDLE FindFirstChangeNotification(

LPTSTR lpszPath, // ścieżka

BOOL fWatchSubTree, // czy zmiana poddrzewa DWORD fdwFilter) ; // rodzaj zmiany

FindNextChangeNotification - zlecenie dokonania kolejnego powiadomienia

FindCloseChangeNotification - rezygnacja

WaitFor... - oczekiwanie na powiadomienie

.NET: System.IO.FileSystemWatcher

(26)

Inne metody synchronizacji

WaitForInputIddle (HANDLE hProcess, DWORD wMillisecinds)

.NET : System.Diagnostics.Process.WaitForInputIdle Czeka aż wątek główny przetworzy wszystkie

komunikaty (np. emulacja naciśnięć klawiszy)

(27)

WIELOZADANIOWOŚĆ W .NET

(28)

Koszt wątku:

Pamięć:

Obiekt jądra (1.2 kB)

Thread environment block (4/8 kB 32/64b)

Stos (tryb użytkownika 1MB)

Stos (tryb jądra 12/24kB dla 32/64b)

DllMain -> DLL_THREAD_ATTACH/

DLL_THREAD_DETACH

Wniosek: wątki kosztują i nie należy ich

nadużywać

(29)

Wątki

Wątek CLR == wątek systemowy (jak dotąd) Start nowego wątku:

new Thread (() => { } ).Start();

new Thread (() => { } ).Start(startState);

Jaki będzie wynik dla:

for (int i = 0; i < 10; i++)

new Thread (() => Console.Write (i)).Start();

(30)

Wątki

Wątek CLR == wątek systemowy (jak dotąd) Start nowego wątku:

new Thread (() => { } ).Start();

new Thread (() => { } ).Start(startState);

Jaki będzie wynik dla:

for (int i = 0; i < 10; i++)

new Thread (() => Console.Write (i)).Start();

a dla

for (int i = 0; i < 10; i++) { int temp = i;

new Thread (() => Console.Write (temp)).Start();

}

(31)

Wątki

Kiedy używać:

długie zadania,

priorytet inny niż normal

zadania foreground

(32)

Pula wątków - .NET

Obiekt systemowy: ThreadPool

Uruchamianie dla void:

ThreadPool.QueueUserWorkItem (

notUsed => Console.WriteLine ("Msg from pool"));

lub dla zwrotu (alternatywnie można zdef. f. callb.)

static int Work (string s) { return s.Length; }

Func<string, int> method = Work;

IAsyncResult cookie = method.BeginInvoke ("test", null, null);

int result = method.EndInvoke (cookie);

Console.WriteLine (“Result: " + result);

(33)

Pula wątków - .NET

Kiedy używać:

krótkie zadania (<250ms, idealnie <100ms),

priorytet normal,

zadania background

ThreadPool.Set[Max|Min]Thread nowe wątki są

alokowane od Min do Max jeśli przez jakiś czas (0.5s) nie zmienia się kolejka zadań do wykonania.

Używane przez : WCF, Remoting, ASP.NET, and ASMX Web Services, System.Timers.Timer i

System.Threading.Timer, BackgroundWorker

asynchr. delegaty

(34)

Wątki, pula

Pot. problemy:

Odebranie i przekazanie ew. wyjątków

Czekanie na wątek (synchronizacja) blokuje inny wątek

Czekanie:

new Thread (() => Console.Write (“test”)).Start().Join();

lub

var endOfWork = new CountdownEvent(10);

for (var i=1; i<=10; i++)

new Thread (() =>endOfWork.signal()).Start();

endOfWork.wait();

(35)

Czekanie inaczej

var threads = new list<Thread>();

for (var i=1; i<=10; i++) {

var t = new Thread (() =>endOfWork.signal());

t.Start();

threads.Add();

}

WaitHandle.WaitAny(threads, true); // false ?

(36)

Parallel Extensions .Net 4.x

.NET Library

Mogą być wykorzystywane we wszystkich językach na platformie .NET

Obejmuje 3 różne elementy

Parallel LINQ (PLINQ)

Task Parallel Library (TPL)

Coordination Data Structures (CDS)

(37)

Od watków do zadań (1)

static void Main() {

Tree tr = Tree.CreateSomeTree(9, 1);

Stopwatch sw = Stopwatch.StartNew();

WalkTree(tr);

Console.WriteLine("Elapsed= " + sw.ElapsedMilliseconds.ToString());

Console.ReadLine();

}

static void WalkTree(Tree tree) { if (tree == null) return;

WalkTree(tree.Left);

WalkTree(tree.Righ);

ProcessItem(tree.Data);

}

(38)

Od watków do zadań (2)

static void WalkTree(Tree tree) { if (tree == null) return;

Thread left = new Thread((o) => WalkTree(tree.Left));

left.Start();

Thread righ = new Thread((o) => WalkTree(tree.Righ));

righ.Start();

left.Join(); righ.Join();

ProcessItem(tree.Data);

}

(39)

Od watków do zadań (3)

static void WalkTree(Tree tree) {

if (tree == null) return;

Task left = new Task((o) => WalkTree(tree.Left));

left.Start();

Task righ = new Task((o) => WalkTree(tree.Righ));

righ.Start();

left.Wait();

righ.Wait();

ProcessItem(tree.Data);

}

(40)

Parallel LINQ-to-Objects

Wykorzystuje model "Task"

Pozwala na wykorzystanie wielu rdzeni przy zapytaniach LINQ

Wspiera w pełni standardowe operatory zapytań

Minimalizuje wpływ na istniejące zapytania LINQ

var q = from p in people

where p.Name == queryInfo.Name &&

p.State == queryInfo.State &&

p.Year >= yearStart &&

p.Year <= yearEnd orderby p.Year ascending select p;

.AsParallel()

(PLINQ)

(41)

PLINQ

IEnumerable<int> numbers =

Enumerable.Range (3, 100000-3);

var parallelQuery =

from n in numbers.AsParallel()

where Enumerable.Range (2, (int) Math.Sqrt (n)) .All (i => n % i > 0)

select n;

int[] primes = parallelQuery.ToArray();

Dla operatorów akceptujących 2 sekwencje (Join, GroupJoin, Concat, Union, Intersect, Except, Zip) AsParalel musi być zaaplikowany do obu sekwencji

(42)

PLINQ

Warto stosować:

Dla relatywnie prostych przypadków

W przypadku bardziej skomplikowanych rozwiazań to podejście może być mniej czytelne

(43)

Parallel Static Class

Gdy instrukcje są niezależne moga być zrównoleglone

StatementA();

StatementB();

StatementC();

Parallel.Invoke(

() => StatementA(), () => StatementB(), () => StatementC() );

(44)

Parallel.For - Przykład

void NonParalelMethod() { for (int i=0; i<16; i++) {

Console.WriteLine(“TID={0}, i={1}”,

Thread.CurrentThread.ManagedThreadId, i);

SimulateProcessing(i);

} }

void ParalelMethod() {

Paralel.For (0, 16, i => {

Console.WriteLine(“TID={0}, i={1}”,

Thread.CurrentThread.ManagedThreadId, i);

SimulateProcessing(i);

} }

(45)

Paralel Invoke

public static void Invoke (params Action[] actions);

Parallel.Invoke (

() => new WebClient().DownloadFile ("http://www.wp.pl", “index.html"), () => new WebClient().DownloadFile

("http://www.pg.gda.pl", “index.html") );

public static void Invoke (ParallelOptions options, params Action[] actions);

Np. Cancelation token

(46)

Paralel Enumerable

public static void Invoke (params Action[] actions);

val w = ParallelEnumerable.Range (1, 10000000).Sum ( i => Math.Sqrt (i))

public static void Invoke (ParallelOptions options, params Action[] actions);

(47)

System.Collections.Concurrent

Przy przetwarzaniu wielozadaniowym należy wykorzystywać klasy kolekcji bezpiecznych ("thread-safe")

ConcurrentStack<T>

ConcurrentQueue<T>

ConcurrentLinkedList<T>

ConcurrentDictionary<TKey,TValue>

ConcurrentBag<TKey,TValue>

BlockingCollection<T>

IProducerConsumerCollection<T>

Partitioner, Partitioner<T>, OrderablePartitioner<T>

zamiast

System.Collections

System.Collections.Generic

(48)

Task Scheduler

Global Queue

Local

Queue Local

Queue

Worker Thread

1 Worker

Thread p Program

Thread Task 1

Task 2 Task 3

Task 5 Task 4

(49)

Task - .NET 4.X

Zeby zadanie miało sens – jego długość powinna być > 200-300 cykli procesora

Domyslnie scheduler używa globalnej puli (można to zmienić)

Aby zapewnić np. priorytety lub niekorzystanie z .Net puli wątków konieczne jest

zaimplementowanie własnego schedulera.

(50)

Task - .NET 4.X

Uruchamianie:

Task task = Task.Run (() => Console.WriteLine (“Task")) .Start();

Task<int> taskInt = Task.Run (() => { return 3; });

Task task = Task.Factory.StartNew ( () => {}, options);

Czekanie:

task.Wait();

lub

int ret = taskInt.Result;

(51)

Task - .NET 4.X

Długie zadania (aby nie blokować puli):

Task task = Task.Factory.StartNew (

() => …, TaskCreationOptions.LongRunning);

Opcje:

LongRunning

PreferFairness – probuje utrzymac kolejnosc wykonania odpowiadajaca kolejnosci tworzenia

AttachedToParent – task potomny

DenyChildAttach – rzuca wyjatek przy probie definicji potomnego task-u

HideScheduler – używa domyslnego schedulera dla StartNew, ContinueWith

(52)

Task vs. wyjątki

Nieobsłużone wyjątki są przechowywane (i wyrzucane)

w momencie odczytu zwrotu

lub wykonania Wait() na zadaniu

W 4.0 były wyrzucane przy finalizatorze (crash aplikacji)

CLR opakowuje wyjątki w AggregateException Task task = Task.Run (

() => { throw new ApplicationException(“Problem”); });

try { task.Wait(); }

catch (AggregateException aex)

{ Console.WriteLine (aex.InnerException.ToString() ); }

Stan zadania można sprawdzić przez IsFaulted i IsCanceled

Sam wyjątek jest dostępny przez właściwość Exception

(53)

Task - kontynuacje

Task.Factory.StartNew<int> (() => 8)

.ContinueWith (ant => ant.Result * 2)

.ContinueWith (ant => Math.Sqr (ant.Result))

.ContinueWith (ant => Console.WriteLine (ant.Result));

Wynik: 256

(54)

Task - kontynuacje

var basicTask=new Task<int> (()=> {return 5;} );

basicTask.ContinueWith (antecedent =>

{

int result = antecedent.Result;

Console.WriteLine (result); // Writes 5 });

basicTask.Start();

Brak konieczności synchronizacji

(55)

Task - kontynuacje

Zwykle kolejne zadania wykonuje ten sam wątek (dla GUI zawsze powinien?),

Można zdefiniować kilka kontynuacji dla jednego zadania.

jesli chcemy mieć pewność: ContinueWith (antecedent =>{…}, TaskContinuationOptions.ExecuteSynchronously);

Domyślnie kontynuacje zostaną skolejkowane, i przy wielu będą pot.

uruchomione jednoczesnie. ExecuteSynchronously je szerguje.

Inne opcje: NotOnCanceled, OnlyOnCanceled, LazyCancellation, LongRunning, NotOnFaulted,

(56)

Task - zagnieżdzony

var parent = Task.Factory.StartNew(() => { Console.WriteLine("Outer task executing.");

var child = Task.Factory.StartNew(() => { Console.WriteLine("Nested task starting.");

Thread.SpinWait(500000);

Console.WriteLine("Nested task completing.");

});

});

parent.Wait();

(57)

Task - potomny

var parent = Task.Factory.StartNew(() => {

Console.WriteLine("Outer, parent task executing.");

var child = Task.Factory.StartNew(() => { Console.WriteLine("Nested task starting.");

Thread.SpinWait(500000);

Console.WriteLine("Nested, child task completing.");

}, TaskContinuationOptions. AttachedToParent);

});

parent.Wait();

(58)

T. – zagnieżdzony vs. potomny

Task.Run – blokuje AttachedToParent tak jak opcja DenyChildAttach

Task.Factory.StartNew – uwzględnia AttachedToParent

Nested AttachedToParent

Rodzic nie czeka na zakończenie

Rodzic czeka, tj. nie kończy wykonania przed dziecmi (ale nie robi join!!!)

Stan rodzica nie zależy od stanu tasku

Stan rodzica zależy od stanu tasku

Rodzic nie przechwytuje wyjątków

Rodzic przechwytuje wyjątki

(59)

Task – czekaj na wszystkie

var taskQueue = new Queue<Task>();

for (int i = 0; i < 10; i++) {

taskQueue.Enqueue(Task.Factory.StartNew(

() => { /* Do work. */ }));

}

// Perform some work with the tasks when they complete.

Task.Factory.ContinueWhenAll(taskQueue.ToArray(), completedTasks => { // Do continuation work.}

);

(60)

Task – czekaj na pierwszy

var taskQueue = new Queue<Task<int>>();

for (int i = 0; i < 10; i++) {

taskQueue.Enqueue(Task<int>.Factory.StartNew(() => { /*Do work.*/ }));

}

// Perform some work with the tasks when they complete.

Task.Factory.ContinueWhenAny(taskQueue.ToArray(), completedTask => {

Console.WriteLine(completedTask.Result); } );

(61)

Realizacja przetwórz po kolei

var taskQueue = new Queue<Task<int>>();

for (int i = 0; i < 10; i++) {

taskQueue.Enqueue(Task<int>.Factory.StartNew(() =>

{ /*Do work.*/ }));

}

// Wykonaj zbieranie gotowych wyników . while (! taskQueue.IsEmpty)

Task<int>.Factory.ContinueWhenAny(taskQueue.ToArray(), completedTask => {

Console.WriteLine(completedTask.Result);

taskQueue.Remove(completedTask);

} );

(62)

Anulowanie przetwarzania

Tradycyjne podejście:

zabicie watków – Problemy:

Wykonanie akcji porządkujących

Jak zabijamy wątki z puli to…?

Co z anuowaniem jeśli są zadania (potomne, zagnieżdżone)

Ustawienie flagi zakończ przetwarzanie Problemy:

Wątek może czekać na obiekcie synchr.

(63)

Unified cancelation Model

token.ThrowIfCancellationRequested();

token.IsCancellationRequested

CancellationTokenSource

OP2 OP3

OP1

Cancel()

(64)

Barrier - kod

static Barrier sync = new Barrier(3);

static CancellationToken token;

static void DriveToSeattle(string name, TimeSpan timeToGasStation) {

try { // Drive to gas station

Console.WriteLine("[{0}] Leaving House", name);

Thread.Sleep(timeToGasStation);

Console.WriteLine("[{0}] Arrived at Gas Station", name);

sync.SignalAndWait(token);

Console.WriteLine("[{0}] Leaving for Seattle", name);

catch(OperationCancelledException) {

Console.WriteLine("[{0}] Caravan was canceled", name);

} }

(65)

Barrier - kod

var source = new CancellationTokenSource();

token = source.Token;

var delay = TimeSpan.FromSeconds(1);

var charlie = new Thread(() => DriveToSeattle("Charlie“, delay));

charlie.Start();

var mac = new Thread(() => DriveToSeattle("Mac", delay));

mac.Start();

var dennis = new Thread(() => DriveToSeattle("Dennis", delay));

dennis.Start();

source.Cancel();

charlie.Join();

mac.Join();

dennis.Join();

(66)

Barrier - kod

static Barrier sync = new Barrier(3);

static CancellationToken token;

static void DriveToSeattle(string name, TimeSpan timeToGasStation) {

try { // Drive to gas station

Console.WriteLine("[{0}] Leaving House", name);

Thread.Sleep(timeToGasStation);

Console.WriteLine("[{0}] Arrived at Gas Station", name);

sync .SignalAndWait();

Console.WriteLine("[{0}] Leaving for Seattle", name);

catch(OperationCancelledException) {

Console.WriteLine("[{0}] Caravan was canceled", name);

} }

(67)

Granice zrównoleglania

Zadania mają sens jeśli są krótkie (nie za krótkie ☺).

100 dlugich zadan – prowadzi powoli do 100 watków w puli … a to jest nieefektywne

100 dlugich zadan z opcja longrunning – prowadzi szybko do 100 wątków… poza pulą

Optymalizacja:

wystartowanie tylu zadań ile jest rdzeni i w WaitAny dorzucanie nowych zadań.

Użycie Parallel loop/for/foreach - w opcjach można określić ogranicznie paralelizmu:

ParallelOptions options = new ParallelOptions();

options.MaxDegreeOfParallelism =

System.Environment.ProcessorCount;

Parallel.For (sourceCollection, options, item => Process(item));

(68)

Task – kontynuacja po nowemu

var basicTask = new Task<int> (()=> {return 15;} );

var awaiter = basicTask.GetAwaiter();

awaiter.OnCompleted (() =>

{

int result = awaiter. GetResult();

Console.WriteLine (result); // Writes 123 });

basicTask.Start();

(69)

Task – completition source

public class TaskCompletionSource<TResult> {

public void SetResult (TResult result);

public void SetException (Exception exception);

public void SetCanceled();

public bool TrySetResult (TResult result);

public bool TrySetException (Exception exception);

public bool TrySetCanceled();

}

var tcs = new TaskCompletionSource<int>();

new Thread (() => { int ret = DoSmthg(); tcs.SetResult (ret); }) .Start();

Task<int> task = tcs.Task; // dedykowany, task oczekujacy Console.WriteLine (task.Result); // poczeka i wypisze ret

wołana jedna z

(70)

completition source

Może być wykorzystane do czekania np. na I/O

Np.:

var timer = new System.Timers.Timer (1000) { AutoReset = false };

timer.Elapsed += delegate

{ timer.Dispose(); tcs.SetResult (10); };

(71)

Asynchroniczny delay – DoAfter

Task DoAfter (int msToDelay){

var tcs = new TaskCompletionSource<object>();

var timer = new System.Timers.Timer (msToDelay) { AutoReset = false };

timer.Elapsed += delegate

{ timer.Dispose(); tcs.SetResult (null); };

timer.Start();

return tcs.Task;

}

DoAfter(1000).GetAwaiter()

.OnCompleted (() => DoSmthgAfter1s ());

//lub

DoAfter(1000). ContinueWith() (ant => DoSmthgAfter1s ());

(72)

Operacje asynchroniczne

Podejście wielowątkowe: Tworzymy kod jako synchroniczny i wywołujemy go w oddzielnym wątku

Podejście asynchroniczne: Funkcja może działać jeszcze po zwróceniu sterowania. Dopóki nie próbujemy uzyskać

wyniku od operacji działającej istotnie długo – nie ma to wpływu na wątek, który ja wywołał

Przykładem są np. ContinueWith/OnContinue 2 typowe scenariusze:

Po stronie serwerowej duza ilość operacji IO

W aplikacji klienckiej uproszenie złożonej logiki wielowątkowej synchronizacji

(73)

Operacje asynchroniczne

Model zalecany dla np. dla metro, SL, wydajnego serwera wielowątkowego (I/O bounded)

Nie startujemy nowych wątków ex-plicite

długie operacje (>=50ms) uruchamiamy jako asynchroniczne

Krótkie operacje robimy w wątku GUI

Zasadniczo można nie robić synchroniazacji dostepu do zasobów

(74)

Operacje asynchroniczne APM

„Zrób swoje” z użyciem CPU, a potem wyjdź i pozwól wywołać Call-back

IAsyncResult BeginXXX (args,

AsyncCallback callback, object state);

public delegate void AsyncCallback (IAsyncResult ar);

return-type EndXXX (IAsyncResult ar);

N.p.: dla stream:

public IAsyncResult BeginRead (byte[] buffer, int offset, int size, AsyncCallback callback, object state);

public int EndRead (IAsyncResult asyncResult);

(75)

IAsyncResult

public interface IAsyncResult {

// "state" object passed to Begin.

object AsyncState { get; } // Signaled when complete.

WaitHandle AsyncWaitHandle { get; }

// Did it complete on BeginX (quick, APN-not supported, //CPU-bond so completed wthout blocking?

bool CompletedSynchronously { get; }

// Has it completed yet?

bool IsCompleted { get; } }

(76)

TPL vs Async

TaskFactory.FromAsync(

BeginXXX method

EndXXX method

Dodatkowe parametry

Task<int> readChunk = Task<int>.Factory.FromAsync ( stream.BeginRead,

stream.EndRead, buffer, 0, 1000, null);

(77)

Przykład wielowątkowy

int GetPrimesCount (int start, int count) {

return ParallelEnumerable.Range (start, count).Count ( n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (

i => n % i > 0));

}

void DisplayPrimeCounts () { const int RANGE = 1000000;

for (int i = 0; i < 10; i++)

Console.WriteLine (" Znaleziono: " +

GetPrimesCount (i*RANGE + 2, RANGE) +

" liczb pierwszych między " + (i * RANGE) +

" oraz " + ((i+1) * RANGE -1));

}

(78)

P. asynchroniczny “naiwny”

Task<int> GetPrimesCountAsync (int start, int count) { return Task.Run (

() => ParallelEnumerable.Range (start, count).Count ( n =>Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (

i => n % i > 0)));

}

void DisplayPrimeCounts() {

const int RANGE = 1000000;

for (int ii = 0; ii < 10; ii++) {// kolejnosc ???

var i = ii;

var awaiter = GetPrimesCountAsync (

i* RANGE + 2, RANGE).GetAwaiter();

awaiter.OnCompleted (() => Console.WriteLine (

“Znaleziono:” + awaiter.GetResult() + “... "));

} } Kolejność... + Koniec zanim liczby zostaną wypisane

(79)

Przykład asynchroniczny

void DisplayPrimeCountsFrom (int i, int count ){

const int RANGE = 1000000;

var awaiter = GetPrimesCountAsync (i * RANGE + 2, RANGE).GetAwaiter();

awaiter.OnCompleted (() => {

Console.WriteLine (“Znaleziono:”, awaiter.GetResult()+ "…");

if (count>0) DisplayPrimeCountsFrom (i,count-1);

else Console.WriteLine ("Done");

});

}

void DisplayPrimeCounts() {

DisplayPrimeCountsFrom (0, 10);

}

DisplayPrimeCounts sama nie jest asynchroniczna

(80)

Przykład asynchroniczny II

class PrimesStateMachine { readonly int MaxCount = 10;

const int RANGE = 1000000;

TaskCompletionSource<object> _tcs =

new TaskCompletionSource<object>();

public Task Task { get { return _tcs.Task; } } public void DisplayPrimeCountsFrom (int i) {

var awaiter = GetPrimesCountAsync (

i* RANGE +2, RANGE).GetAwaiter();

awaiter.OnCompleted (() => {

Console.WriteLine (awaiter.GetResult());

if (i++ < MaxCount) DisplayPrimeCountsFrom (i);

else { Console.WriteLine ("Done"); _tcs.SetResult (null); } });

} }

(81)

Przykład asynchroniczny II cd.

Task DisplayPrimeCountsAsync() {

var machine = new PrimesStateMachine();

machine.DisplayPrimeCountsFrom(0);

return machine.Task;

}

(82)

C# >= 5.0

Kod:

var result = await wyrażenie;

Instukcja(je);

Jest rozwijany do:

var awaiter = wyrażenie.GetAwaiter();

awaiter.OnCompleted (() =>

{

var result = awaiter.GetResult();

Instukcja(je);

);

(83)

Przykład asynchroniczny

Task<int> GetPrimesCountAsync (int start, int count) { return Task.Run (

() => ParallelEnumerable.Range (start, count).Count (

n =>Enumerable.Range (2, (int)Math.Sqrt(n)-1).All ( i => n % i > 0)));

}

async void DisplayPrimeCounts() {

const int RANGE = 1000000;

for (int i = 0; i < 10; i++) Console.WriteLine (

await GetPrimesCountAsync (i* RANGE +2, RANGE));

}

(84)

async void DisplayPrimesCount() {

int result = await GetPrimesCountAsync (2, 1000000);

Console.WriteLine (result);

}

Odpowiada funkcjonalnie następującemu kodowi void DisplayPrimesCount() {

var awaiter = GetPrimesCountAsync (2, 1000000).GetAwaiter();

awaiter.OnCompleted (() => { int result = awaiter.GetResult();

Console.WriteLine (result);

});

}

(85)

C# >= 5.0 async

Może być zaaplikowany do metod zwracających

void

Task

Task<T>

Nie zmienia sygnatury (podobnie jak unsafe)

W ciele metody napotkanie await zwraca sterownie (podobnie jak yield)

Async może być dodany do labdy i metod anonimowych

(86)

C# >=5.0 await

Typowo wywoływany jest na Task-u

Wystarczy aby przedmiot wołania await miał

Metodę GetAwaiter która zwróci obiekt implementujący

INotifyCompletion.OnCompleted (tj. GetResult zwracające odpowiedni typ i właściwość IsCompleted)

Może wystąpic w metodach asynchronicznych praktycznie wszędzie z wyjątkiem:

catch / finally

Wyrażenia lock,

Kontekstu unsafe

Punktu wejścia do aplikacji (main method).

(87)

Równoległość ponownie

Wołanie metod async bez await powoduje ich równoległe (thread pool) wykonanie.

async Task DoSmthg1() {

… }

async Task DoSmthg2() {

… }

var task1 = DoSmthg1();

var task2 = DoSmthg2();

(88)

Konstrukcja bogatego GUI

Etapy:

Piszemy metody synchronicznie

Zamieniamy synchroniczne wołania na asynchroniczne i wykonujemy na nich await

Z wyjątkiem głównych metod (obsługa zdarzeń w GUI)

zamieniamy typy zwracane na Task lub Task<TResult> tak by można było na nich czekać

Cytaty

Powiązane dokumenty

An algebra B is said to be representable if it is isomorphic to a partitioner algebra for some mad (maximal almost disjoint) family M.. See [2] for

In constructing embeddings of Boolean algebras and extensions of their automorphisms, we have to avoid the following problem: It is well known that there are gaps in P (ω)/fin which

However, a drawback of this spectral characteriza- tion of the decomposability is that if the decomposition of a given Boolean function f is possible, it may not be immediately

The basic idea is to consider sequentially continuous and strictly posi- tive functionals defined on σ-complete Boolean algebras, in the sequel called Mazur functionals..

Four-part semigroups form a new class of semigroups which became im- portant when sets of Boolean operations which are closed under the binary superposition operation f + g := f

We first iden- tify by means of Design Space Exploration specific GNR topolo- gies for 2- and 3-input {AND, NAND, OR, NOR, XOR, XNOR} and demonstrate by means of the

spisał umowę z Dyrekcją Nakładu Książek Szkolnych w Wiedniu, dzięki czemu stał się na wiele lat jedynym dostawcą starannie przygotowanych polskich podręczni- ków szkolnych

Finally, we obtain a characterisation of Boolean algebras that carry a strictly positive nonatomic measure in terms of a chain condition, and we draw the conclusion that under M A +