• Nie Znaleziono Wyników

Wielowątkowość 3

N/A
N/A
Protected

Academic year: 2021

Share "Wielowątkowość 3"

Copied!
102
0
0

Pełen tekst

(1)

Wielowątkowość 3

Marcin Orchel

AGH University of Science and Technology in Poland

(2)

Agenda

Agenda 2 / 102

(3)

1 Pakiet java.util.concurrent

2 Uzupełnienia do przetwarzania współbieżnego

3 Klasyczne problemy synchronizacji

(4)

Pakiet java.util.concurrent

Pakiet java.util.concurrent 4 / 102

(5)

java.util.concurrent

JDK

Java

Language Java Language

Tools &

Tool APIs

java javac javadoc jar javap jdeps Scripting Security Monitoring JConsole VisualVM JMC JFR

JPDA JVM TI IDL RMI Java

DB Deployment Internationalization Web Services Troubleshooting

JRE

Deployment Java Web Start Applet / Java Plug-in

User Interface Toolkits

JavaFX

Swing Java 2D AWT Accessibility

Drag and Drop Input Methods Image I/O Print Service Sound

Java SE API Integration

Libraries IDL JDBC JNDI RMI RMI-IIOP Scripting

Compact Profiles Other Base

Libraries

Beans Security Serialization Extension Mechanism JMX XML JAXP Networking Override Mechanism JNI Date and Time Input/Output Internationalization

lang and util Base Libraries

lang and util

Math Collections Ref Objects Regular Expressions Logging Management Instrumentation Concurrency Utilities

(6)

java.util.concurrent.atomic

Co oznacza operacja atomowa? To oznacza, że nie jest możliwe uzyskanie rezultatu pośredniego operacji. Inna definicja operacje atomowe to operacje podstawowe wolne od wyścigu do danych.

Które operacje są atomowe wg specyfikacji Java?

Akcje elementarne, które dodają wątki do i usuwają wątki ze zbioru oczekującego są atomowe.

W modelu teoretycznym wykonania spójnego sekwencyjnie, każda indywidualna akcja jest atomiczna.

zapis i odczyt zmiennych volatile long lub double są zawsze atomiczne, dla zmiennych long lub double nie będących volatile nie ma takiej gwarancji

zapis i odczyt referencji jest zawsze atomowy

Czy zapis i odczytintjest atomowy? Nie ma wprost informacji o tym w specyfikacji Java.

klasy AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference metoda

boolean compareAndSet(expectedValue, updateValue);

Pakiet java.util.concurrent 6 / 102

(7)

java.util.concurrent.atomic

weakCompareAndSet może zwrócić false czasami bez wyraźnej przyczyny, a więc nawet wtedy gdy wartość w pamięci zgadza się z expectedValue, oraz nie gwarantuje porządku pojawienia-przed, a więc kiedy wątek widzi uaktualnienie tej zmiennej spowodowane tą

metodą, to nie ma gwarancji, że widzi uaktualnienia także innych zmiennych, które były uaktualnione przed weakCompareAndSet.

import java.util.concurrent.atomic.AtomicInteger;

class AtomicCounter {

private AtomicInteger c = new AtomicInteger(0);

public void increment() { c.incrementAndGet();

}

public void decrement() { c.decrementAndGet();

}

(8)

Atomiczność w C++

Typ atomowy jest specjalizacją szablonu atomic.

operacja is_lock_free() - sprawdzenie czy operacje są wolne od blokad

Inicjalizacja zmiennej atomowej nie jest operacją atomową.

Pakiet java.util.concurrent 8 / 102

(9)

Atomiczność w C++

najczęściej operacje atomiczne są zaimplementowane bez blokad, operacja is_lock_free() służy do sprawdzania czy operacje są wolne od blokad

inicjalizacja zmiennej atomowej nie jest operacją atomową, dlatego należy zadbać o to, aby była prosta

(10)

Atomiczność w C++

std::atomic<int> ai(0); //inicjalizacja ai wartością 0 ai = 10; // atomowe ustawienie ai wartości 10

std::cout << ai; //atomowe odczytanie wartości ai ++ai; //atomowe zwiększenie ai do 11

--ai; //atomowe zmniejszenie ai do 10

Pakiet java.util.concurrent 10 / 102

(11)

Atomiczność w C++

std::atomic<bool> valAvailable(false);

auto imptValue = computeImportantValue(); //obliczanie wartości

valAvailable = true; //informowanie innego zadania, że wartość jest dostępna

czy może być zmieniona kolejność instrukcji 2 i 3? Kompilatory widzą te instrukcje jako a = b oraz x = y , a więc są niezależne. Więc kompilator mógłby zamienić te instrukcje miejscami. Jednakże domyślnym modelem pamięci w C++ dla obiektów std::atomic jest spójność sekwencyjna, a więc kolejność wszystkich instrukcji musi być zachowana, więc nie może być ta optymalizacja zrobiona.

zachowanie sekwencyjnej spójności w C++ oznacza, że kompilatory generują kod. które zapewnia, że zrobi to także podległy sprzęt.

(12)

Atomiczność w C++

możliwość użycia poleceń load i store

std::atomic<int> y(x.load()); //odczyt x y.store(x.load()); //ponowny odczyt x

Pakiet java.util.concurrent 12 / 102

(13)

Executors, współbieżność zadaniowa

odseparowaniezarządzania wątkami od reszty aplikacji obiekty, które realizują powyższe towykonawcy

dostępne są 3 interfejsyExecutor– interfejs do uruchamiania nowych zadań, ExecutorService – podinterfejs Executor z dodatkowymi funkcjami pomagającymi w zarządzaniu cyklem życia indywidualnych zadań i egzekutora, ScheduledExecutorService – podinterfejs ExecutorService wspierający przyszłe i okresowe wykonywanie zadań zamiast (new Thread(r)).start(); piszemy e.execute(r);

nowy kod może utworzyć nowy wątek i uruchomić zadanie lub użyć istniejącegowątku roboczego, albo umieścić r w kolejce do

oczekiwania na zwolnienie wątku roboczego

W klasie ExecutorService mamy dodatkowo dostępną metodę submit, która akceptuje obiekty typu Runnable, a także Callable

(14)

Executors

istnieją również metody w ExecutorService umożliwiające podanie większej liczby zadań do wykonania

istnieją również metody do zarządzania zamknięciem wykonawcy W ScheduledExecutorService jest metoda schedule, która wykonuje zadanie z określonym opóźnieniem, ponadto istnieją metody scheduleAtFixedRate, oraz scheduleWithFixedDelay,

uruchamiające zadania wielokrotnie lub ze zdefiniowanymi odstępami.

relacja pojawienia-przed: wszystkie akcje w wątku przed podaniem zadania do ExecutorService pojawiają-się-przed akcjami z tego zadania

Pakiet java.util.concurrent 14 / 102

(15)

Executors

utworzenie wykonawcy z jednym wątkiem ExecutorService executor = Executors.newSingleThreadExecutor();

przesłanie obiektu Runnable executor.execute(runnable);

zakończenie działania obiektu wykonującego executor.shutdown() inne metody to invokeAny, invokeAll, awaitTermination, pobranie wyników po zakończeniu za pomocą

ExecutorCompletionService

wykonawca z dowolną liczbą wątków w puli wątków Executors.newCachedThreadPool

zagwarantowanie stałej liczby wątków w puli wątków Executors.newFixedThreadPool

(16)

ExecutorService

class NetworkService implements Runnable { private final ServerSocket serverSocket;

private final ExecutorService pool;

public NetworkService(int port, int poolSize) throws IOException {

serverSocket = new ServerSocket(port);

pool = Executors.newFixedThreadPool(poolSize);

}

public void run() { // run the service try {

for (;;) {

pool.execute(new Handler(serverSocket.accept()));

}

} catch (IOException ex) { pool.shutdown();

} } }

Pakiet java.util.concurrent 16 / 102

(17)

ExecutorService

class Handler implements Runnable { private final Socket socket;

Handler(Socket socket) { this.socket = socket; } public void run() {

// read and service request on socket }

}

(18)

ExecutorService

void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being

submitted try {

// Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {

pool.shutdownNow(); // Cancel currently executing tasks, [zatrzymanie wykonania wątków oczekujących]

// Wait a while for tasks to respond to being cancelled

if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) {

// (Re-)Cancel if current thread also interrupted pool.shutdownNow();

// Preserve interrupt status

Thread.currentThread().interrupt();

} }

Pakiet java.util.concurrent 18 / 102

(19)

ExecutorService - własna implementacja

public final class GlobalExecutorService {

private static final Logger logger =

LoggerFactory.getLogger(GlobalExecutorService.class);

public static final int THREADS_COUNT = Runtime.getRuntime().availableProcessors();

private static final ExecutorService singleThreadExecutorService;

private static final ExecutorService executorService;

static {

singleThreadExecutorService = Executors.newFixedThreadPool(1);

(20)

ExecutorService - własna implementacja

private GlobalExecutorService() { assert false; }

public static void shutdown() {

GlobalExecutorService.shutdownAndAwaitTermination(singleThreadExecutorService);

GlobalExecutorService.shutdownAndAwaitTermination(executorService);

}

public static ExecutorService getSingleThreadExecutorService() {

return singleThreadExecutorService; }

public static ExecutorService getExecutorService() {

return executorService; }

Pakiet java.util.concurrent 20 / 102

(21)

ExecutorService - własna implementacja

public static void

shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown();

try {

if (!pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {

pool.shutdownNow();

if (!pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {

logger.error("Pool did not terminate");

} }

} catch (InterruptedException ie) {

(22)

Klasa Future

relacja pojawienia-przed: wszystkie akcje w wątku przed podaniem zadania pojawiają-się-przed zwróceniem rezultatu z pomocą Future.get()

Pakiet java.util.concurrent 22 / 102

(23)

Klasa Future

interface ArchiveSearcher { String search(String target); } class App {

ExecutorService executor = ...

ArchiveSearcher searcher = ...

void showSearch(final String target) throws InterruptedException {

Future<String> future

= executor.submit(new Callable<String>() { public String call() {

return searcher.search(target);

}});

displayOtherThings(); // do other things while searching

try {

(24)

Klasa FutureTask

klasa FutureTask to implementacja interfejsu Future, umożliwia ona uruchomienie pojedynczego zadania w osobnym wątku umożliwia podanie obiektu Callable do metody execute FutureTask<String> future =

new FutureTask<String>(new Callable<String>() { public String call() {

return searcher.search(target);

}});

executor.execute(future);

Pakiet java.util.concurrent 24 / 102

(25)

Interfejs CompletionService

możliwość przetwarzania rezultatów zadań w kolejności ich zakończenia

wgrane zadania po zakończeniu są umieszczane w kolejce, do której mamy dostęp za pomocą take()

akcje w wątku przed dodaniem zadania do CompletionService pojawiają-się-przedakcjami z zadania, które pojawiają-się-przed akcjamipo pomyślnym powrocie z take()

(26)

Interfejs CompletionService

void solve(Executor e,

Collection<Callable<Result>> solvers)

throws InterruptedException, ExecutionException { CompletionService<Result> ecs

= new ExecutorCompletionService<Result>(e);

for (Callable<Result> s : solvers) ecs.submit(s);

int n = solvers.size();

for (int i = 0; i < n; ++i) { Result r = ecs.take().get();

if (r != null) use(r);

} }

Pakiet java.util.concurrent 26 / 102

(27)

Alternatywa do Executors

użycie klasy java.util.Timer oraz java.util.TimerTask, zostało zastąpione ScheduledThreadPoolExecutor

(28)

Zadania w C++

mamy parę obiektówfuturei promiseoraz stan współdzielony, zadanie wstawia wynik do obiektu promise, a inne zadanie pobiera wynik z obiektu future. Stan współdzielony zawiera: wartość, bit gotowości (czy wartość jest gotowa do pobrania), zadanie, licznik użyć, dane dotyczące wzajemnego wykluczania,

typ promise to uchwyt do współdzielonego stanu, zadanie zapisuje wynik (set_value(x))

typ packaged_task przechowuje zadanie i parę future-promise klasa future to uchwyt do współdzielonego stanu, zadanie może pobierać wyniki wstawione przez promise

typ shared_future umożliwia odczytanie wyniku future więcej niż jeden raz

async asynchroniczny starter wątków, starter podejmuje decyzje czy uruchomić nowy wątek, dostępne są dwie zasady uruchamiania:

std::launch::async (wykonuje zadanie uruchamiając nowy wątek lub za pomocą istniejącego) i std::launch::deferred (wykonuje zadanie przy wywołaniu get)

Pakiet java.util.concurrent 28 / 102

(29)

Zadania w C++

w podejściu wątkowym mamy

int doAsyncWork(); std::thread t(doAsyncWork);

w podejściu zadaniowym auto fut = std::async(doAsyncWork);, obiektem przekazywanym jest zadanie, zwracany jest obiekt future domyślna zasada uruchamiania wątków jest sumą dwóch

wspomnianych zasad async i deferred, a więc jedna z nich zostanie wybrana przez implementację biblioteki

auto fut1 = std::async(f);, oraz

auto fut2=std::async(std::launch::async | std::launch::deferred, f);

(30)

Klasa CompletableFuture

jest to odpowiednik obiektu promisez c++, różnica jest taka, że CompletableFuture implementuje Future, a więc nie ma podziału na future i promise

możemy zakończyć działanie ustawiając rezultat, metoda complete(T value), rezultat będzie zwrócony przez get

CompletableFuture implementuje CompletionStage

Pakiet java.util.concurrent 30 / 102

(31)

Alternatywa dla wait/notify

jeśli obiekt CompletableFuture posiada możliwość ustawiania i pobierania rezultatu wraz z oczekiwaniem i powiadamianiem, to możemy użyć tego mechanizmu zamiast wait/notify

przykład dla c++

(32)

Alternatywa dla wait/notify

std::promise<void> p; //obiekt promise do kanału komunikacyjnego

... //wykrywanie zdarzenia

p.set_value(); //informowanie zdarzenia reagującego

Pakiet java.util.concurrent 32 / 102

(33)

Alternatywa dla wait/notify

... //przygotowanie do reakcji

p.get_future().wait(); //oczekiwanie na obiekt future odpowiadający obiektowi p

... //reakcja na zdarzenie

(34)

Alternatywa dla wait/notify

porównanie z wait/notify: nie wymagany jest muteks

działa w przypadku gdy zadanie wykrywające ustawi swój obiekt std::promise przed oczekiwaniem (wait), (wait/notify również tak działa ponieważ jest sprawdzana flaga)

w Javie używamy metody get na Future, metoda ta jest odporna na fałszywe wybudzenia

wada: obiekt std::promise może być ustawiony tylko raz

Pakiet java.util.concurrent 34 / 102

(35)

Klasa Lock

pakiet java.util.concurrent.locks interfejs Lock

(36)

Lock vs synchronized

podobnie jak w mechanizmie synchronized tylko jeden wątek może zajmować blokadę w danym czasie, obiektyLock wspierają również mechanizm wait/notify poprzez obiekty Condition

Podstawową zaletą obiektów Lock jest możliwość dalszego

wykonywania wątku, jeśli nie jest możliwe zajęcie blokady. Metoda tryLock kończy się natychmiast lub po ustalonym czasie, kiedy blokada nie jest dostępna. Metoda lockInterruptibly kończy się, kiedy inny wątek wyśle sygnał przerwania przed zajęciem blokady do oczekującego wątku.

Przykładową implementacją jest ReentrantLock. Co oznacza

Reentrant? Wielobieżność. Oznacza to, że wątek może zająć blokadę wielokrotnie bez blokowania siebie. Gdyby blokada nie była reentrant, wtedy wątek zostałby zablokowany przy drugiej próbie zajęcia blokady.

Pakiet java.util.concurrent 36 / 102

(37)

Lock vs synchronized

Konstruktor tej klasy posiada dodatkowy parametr fair, który faworyzuje dostęp dla najdłużej czekającego wątku. W przeciwnym wypadku nie ma żadnych gwarancji odnośnie kolejności dostępu.

Uwaga: parametr ma wpływ na zajmowanie blokady, natomiast nie ma bezpośredniego wpływu na czas procesora zajmowany przez wątek.

(38)

Lock vs synchronized

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

import java.util.Random;

public class Safelock { static class Friend {

private final String name;

private final Lock lock = new ReentrantLock();

public Friend(String name) { this.name = name;

}

public String getName() { return this.name;

}

Pakiet java.util.concurrent 38 / 102

(39)

Lock vs synchronized

public boolean impendingBow(Friend bower) { Boolean myLock = false;

Boolean yourLock = false;

try {

myLock = lock.tryLock();

yourLock = bower.lock.tryLock();

} finally {

if (! (myLock && yourLock)) { if (myLock) {

lock.unlock();

}

if (yourLock) {

bower.lock.unlock();

}

(40)

Lock vs synchronized

public void bow(Friend bower) { if (impendingBow(bower)) {

try {

System.out.format("%s: %s has"

+ " bowed to me!%n",

this.name, bower.getName());

bower.bowBack(this);

} finally { lock.unlock();

bower.lock.unlock();

} } else {

System.out.format("%s: %s started"

+ " to bow to me, but saw that"

+ " I was already bowing to him.%n", this.name, bower.getName());

} }

Pakiet java.util.concurrent 40 / 102

(41)

Lock vs synchronized

public void bowBack(Friend bower) { System.out.format("%s: %s has" +

" bowed back to me!%n", this.name, bower.getName());

} }

static class BowLoop implements Runnable { private Friend bower;

private Friend bowee;

public BowLoop(Friend bower, Friend bowee) { this.bower = bower;

(42)

Lock vs synchronized

public void run() {

Random random = new Random();

for (;;) { try {

Thread.sleep(random.nextInt(10));

} catch (InterruptedException e) {}

bowee.bow(bower);

} } }

public static void main(String[] args) {

final Friend alphonse = new Friend("Alphonse");

final Friend gaston = new Friend("Gaston");

new Thread(new BowLoop(alphonse, gaston)).start();

new Thread(new BowLoop(gaston, alphonse)).start();

}

} Pakiet java.util.concurrent 42 / 102

(43)

Condition

class BoundedBuffer {

final Lock lock = new ReentrantLock();

final Condition notFull = lock.newCondition();

final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];

int putptr, takeptr, count;

public void put(Object x) throws InterruptedException { lock.lock();

try {

while (count == items.length) notFull.await();

items[putptr] = x;

if (++putptr == items.length) putptr = 0;

++count;

notEmpty.signal();

(44)

Condition

public Object take() throws InterruptedException { lock.lock();

try {

while (count == 0) notEmpty.await();

Object x = items[takeptr];

if (++takeptr == items.length) takeptr = 0;

--count;

notFull.signal();

return x;

} finally { lock.unlock();

} } }

Pakiet java.util.concurrent 44 / 102

(45)

Condition vs wait/notify

możemy mieć wiele obiektów Condition, dla jednego zajęcia blokady, w mechanizmie wait/notify musimy zająć blokady na obu tych obiektach

dwie dodatkowe metody awaitUntil(Date deadline), awaitUninterruptibly()

(46)

Semafor

semaforsłuży do kontrolowania liczby wątków mających dostęp do pewnych zasobów

semaforto zbiór pozwoleń, każde acquire() jest blokowane dopóki pozwolenie nie będzie dostępne, i następnie jest ono zabierane. Każde release() dodaje pozwolenie. Nie ma obiektów związanych z pozwoleniami, jest tylko licznik.

monitor vs semafor: monitor jest zwalniany przez wątek, który go zajął, semafor może być zwalniany przez dowolny wątek, może to być użyteczne do wyjścia z zakleszczenia

binarny semafor kiedy zbiór pozwoleń jest jednoelementowy akcje w wątku przed wywołaniem release pojawiają-się-przed akcjami następującymi po pomyślnym acquire w innym wątku

Pakiet java.util.concurrent 46 / 102

(47)

Semafor

class Pool {

private static final int MAX_AVAILABLE = 100;

private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

public Object getItem() throws InterruptedException { available.acquire();

return getNextAvailableItem();

}

public void putItem(Object x) { if (markAsUnused(x))

available.release();

}

// Not a particularly efficient data structure; just for

(48)

Semafor

protected synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) {

if (!used[i]) { used[i] = true;

return items[i]; }}

return null; // not reached }

protected synchronized boolean markAsUnused(Object item) {

for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) {

if (used[i]) { used[i] = false;

return true;

} else

return false; }}

return false;

} }

Pakiet java.util.concurrent 48 / 102

(49)

Binarny Semafor vs Lock

Lock musi być zwalniany przez wątek, który go zajął, semafor niekoniecznie. Z tego wynika, że dla semafora może być wywołane release przez wątek bez wcześniejszego wywołania acquire.

(50)

LockSupport

działa podobnie jak binarny semafor metody park i unpark

posiada metodę parkNanos(long nanos) umożliwiającą oczekiwanie na pozwolenie maksymalnie przez określony czas

Pakiet java.util.concurrent 50 / 102

(51)

Blokady w C++

Muteks - obiekt typu mutex, wyróżniamy klasy mutex, recursive_mutex, timed_mutex, recursive_timed_mutex

muteks posiada m.in. metody lock() i try_lock(), unlock(), przy wywołaniu unlock wątekpowinienzajmować mutex, a więc nie jest to wymagane, czyli mutex zachowuje się podobnie do binarnego semafora Może być taka sytuacja, że po wywołaniu unlock(), inny wątek B zajmie ten sam mutex, zwolni go i zniszczy, przed tym jak wątek A powróci z metody unlock()

jeśli mutex wywoła lock ponownie dojdzie do zakleszczenia wątku ze sobą, rozwiązanie recursive_mutex może być wielokrotnie zajmowany przez jeden wątek, tak jak w Javie ReentrantLock

timed_mutex - udostępnia operacje pozwalające zająć mutex tylko

(52)

Blokady w C++

co jeśli nie wywołamy unlock()? Możemy użyć lock_guard lub unique_lock, które automatycznie wywołają unlock() w destruktorze blokada (lock) to obiekt, który trzyma referencję do blokowalnego obiektu, i może odblokować ten obiekt w destruktorze, lock_guard i unique_lock to strażnicy do obiektów, które można zablokować i odblokować (lockable object, np. mutex)

unique_lock udostępnia ponadto operacje na muteksie shared_lock umożliwia transfer blokady do innego obiektu shared_lock

void use(mutex& mtx, vector<string>& vs, int i) {

lock_guard<mutex> g (mtx);

if (i<0) return;

string s = vs[i];

...

}

Pakiet java.util.concurrent 52 / 102

(53)

Blokady w C++

wiele blokad, funkcjavoid lock(L1&, L2&, L3&...) umożliwia zajęcie blokad w kolejności i tym samym zapobiega zakleszczeniu call_once -void call_once(once_flag& flag, Callable&&

func, Args&&... args) - umożliwia uruchomienie jednorazowe podanej funkcji

(54)

Blokady w C++

odpowiednikiem wait/notify i Condition w C++ jest

condition_variable z metodami notify_one(), notify_all(), wait(lck) możemy użyć wait(lck, pred)

void wait(unique_lock<mutex>& lock, Predicate pred); i nie musimy pisać pętli zabezpieczającej dla wait przed

niespodziewanym wybudzeniem

Pakiet java.util.concurrent 54 / 102

(55)

Blokady w C++

std::condition_variable cv;

std::mutex_ m;

bool flag(false);

...//wykrywanie zdarzenia {

std::lock_guard<std::mutex> g(m); //blokowanie m przez konstruktor g

flag = true; //informowanie zadania reagującego } //odblokowanie m przez destruktor g

cv.notify_one(); //informowanie zdarzenia reagującego

(56)

Blokady w C++

... //przygotowanie do reakcji {

std::unique_lock<std::mutex> lk(m);

cv.wait(lk, [] {return flag; }) //użycie lambda w celu uniknięcia fałszywych wybudzeń

... //reakcja na zdarzenie //m jest zablokowany

}

... //kontynuacja reakcji //teraz m jest odblokowany

Pakiet java.util.concurrent 56 / 102

(57)

ReadWriteLock

interfejs ReadWriteLock posiada dwie blokady, jedna dla operacji tylko odczytu, a druga dla zapisu. Blokada odczytu może być zajęta przez wiele wątków, jeśli nie ma wątków zapisujących. Blokada zapisu jest blokadą wykluczającą.

klasa implementująca ReentrantReadWriteLock

(58)

ReadWriteLock

class RWDictionary {

private final Map<String, Data> m = new TreeMap<String, Data>();

private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

private final Lock r = rwl.readLock();

private final Lock w = rwl.writeLock();

public Data get(String key) { r.lock();

try { return m.get(key); } finally { r.unlock(); } }

public String[] allKeys() { r.lock();

try { return m.keySet().toArray(); } finally { r.unlock(); }

}

Pakiet java.util.concurrent 58 / 102

(59)

ReadWriteLock

public Data put(String key, Data value) { w.lock();

try { return m.put(key, value); } finally { w.unlock(); }

}

public void clear() { w.lock();

try { m.clear(); } finally { w.unlock(); } }

}

(60)

ReadWriteLock w C++

shared_timed_mutex - muteks czasowy, ale jest możliwość zajęcia mutexu przez wiele wątków (lock_shared()), dostępna jest również operacja lock(), kiedy agent wykonawczy posiada wyłączny dostęp (lock), to nie może być w tym samym czasie innego agenta z zajętym shared lock. Odpowiada to semantyce wielu czytelników i wyłącznego dostępu przez pisarza.

Pakiet java.util.concurrent 60 / 102

(61)

CountDownLatch

zatrzask odliczający pozwala wątkom na oczekiwanie aż zbiór operacji wykonywanych przez inne wątki zakończy się

te pierwsze z nich wywołują metodę await, a druga grupa wątków wywołuje metodę countDown()

class Driver { // ...

void main() throws InterruptedException {

CountDownLatch startSignal = new CountDownLatch(1);

CountDownLatch doneSignal = new CountDownLatch(N);

for (int i = 0; i < N; ++i) // create and start threads new Thread(new Worker(startSignal,

doneSignal)).start();

doSomethingElse(); // don’t let run yet startSignal.countDown(); // let all threads proceed

(62)

CountDownLatch

class Worker implements Runnable {

private final CountDownLatch startSignal;

private final CountDownLatch doneSignal;

Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {

this.startSignal = startSignal;

this.doneSignal = doneSignal;

}

public void run() { try {

startSignal.await();

doWork();

doneSignal.countDown();

} catch (InterruptedException ex) {} // return;

}

void doWork() { ... } }

Pakiet java.util.concurrent 62 / 102

(63)

CyclicBarrier

pozwala wątkom na oczekiwanie aż każdyz nich osiągnie pewien punkt, bariera może być ponownie użyta kiedy wątki oczekujące zostaną zwolnione

barrier action jest wykonywana po tym jak ostatni wątek dojdzie do bariery, ale przed zwolnieniem wątków

akcje w wątku przed wywołaniem await()pojawiają-się-przedakcjami z barrier action, które to pojawiają-się-przed akcjami po pomyślnym powrocie z await() w innych wątkach

class Solver { final int N;

final float[][] data;

(64)

CyclicBarrier

class Worker implements Runnable { int myRow;

Worker(int row) { myRow = row; } public void run() {

while (!done()) { processRow(myRow);

try {

barrier.await();

} catch (InterruptedException ex) { return;

} catch (BrokenBarrierException ex) { return;

} } } }

Pakiet java.util.concurrent 64 / 102

(65)

CyclicBarrier

public Solver(float[][] matrix) { data = matrix;

N = matrix.length;

Runnable barrierAction =

new Runnable() { public void run() { mergeRows(...);

}};

barrier = new CyclicBarrier(N, barrierAction);

List<Thread> threads = new ArrayList<Thread>(N);

for (int i = 0; i < N; i++) {

Thread thread = new Thread(new Worker(i));

threads.add(thread);

thread.start();

}

// wait until done

(66)

CyclicBarrier vs CountDownLatch

w CyclicBarrier zdefiniowana jest akcja, która zostanie wykonana jeśli ostatni wątek dojdzie do bariery (do await())

w CyclicBarrier mamy grupę wątków czekających, a w

CountDownLatch mamy grupę wątków czekających i drugą grupę wykonującą countDown()

ta druga grupa wątków w CountDownLatch nie jest blokowana po wywołaniu countDown()

w CyclicBarrier podajemy minimalną liczbę wątków czekających, które muszą dojść do bariery, natomiast w CountDownLatch nie podajemy liczby wątków czekających, a liczbę wątków wywołujących

countDown().

CyclicBarrier posiada metodę reset()

Pakiet java.util.concurrent 66 / 102

(67)

BlockingQueue

dodatkowe struktury doJava Collections Framework

BlockingQueue to kolejka, która dodatkowo wspiera operację oczekiwania, aż kolejka nie będzie pusta w trakcie pobierania elementu, i oczekiwania na miejsce w kolejce podczas dodawania elementu

metody blokujące to put(e) oraz take()

akcje w wątku przed dodaniem elementu do BlockingQueue

pojawiają-się-przedkolejnymi akcjami po dostępie lub usunięciu tego elementu z tej kolejki w innym wątku

(68)

BlockingQueue

class Producer implements Runnable { private final BlockingQueue queue;

Producer(BlockingQueue q) { queue = q; } public void run() {

try {

while (true) { queue.put(produce()); }

} catch (InterruptedException ex) { ... handle ...}

}

Object produce() { ... } }

Pakiet java.util.concurrent 68 / 102

(69)

BlockingQueue

class Consumer implements Runnable { private final BlockingQueue queue;

Consumer(BlockingQueue q) { queue = q; } public void run() {

try {

while (true) { consume(queue.take()); }

} catch (InterruptedException ex) { ... handle ...}

}

void consume(Object x) { ... } }

(70)

BlockingQueue

class Setup { void main() {

BlockingQueue q = new SomeQueueImplementation();

Producer p = new Producer(q);

Consumer c1 = new Consumer(q);

Consumer c2 = new Consumer(q);

new Thread(p).start();

new Thread(c1).start();

new Thread(c2).start();

} }

Pakiet java.util.concurrent 70 / 102

(71)

ConcurrentMap - kolekcje współbieżne

mapa gwarantująca bezpieczeństwo wątków (ang. thread-safe) i atomiczność. Dodatkowe metody (atomiczne) w stosunku do Map to przykładowo remove(Object key, Object value) – usunięcie elementu dla podanego klucza, tylko jeśli mapuje do podanej wartości,

replace(K key, V value) – zastąpienie elementu dla podanego klucza, tylko jeśli mapuje do podanej wartości, putIfAbsent(K key, V value) - jeśli mapa nie zawiera klucza to jest dodany klucz z podaną

wartością, jeśli zawiera to jest zwracana wartość aktualna bezpieczeństwo wątków oznacza, że możemy tej mapy używać w dostępiewielowątkowym

przykładowa implementacja to ConcurrentHashMap. Iteratory nie wyrzucają wyjątku ConcurrentModificationException

akcje w wątku przed dodaniem elementu do ConcurrentMap jako klucz lub wartośćpojawiają-się-przed akcjami następującymi po

(72)

ConcurrentMap - kolekcje współbieżne

private static final ConcurrentMap<String, String> map = new ConcurrentHashMap<String, String>();

public static String intern(String s) {

String previousValue = map.putIfAbsent(s,s);

return previousValue == null ? s : previousValue;

}

Pakiet java.util.concurrent 72 / 102

(73)

ConcurrentMap - kolekcje współbieżne

zoptymalizowana wersja

warto stosować kolekcje współbieżne zamiast kolekcji zewnętrznie synchronizowanych (Collections.synchronizedMap lub Hashtable) public static String intern(String s) {

String result = map.get(s);

if (result == null) {

result == map.putIfAbsent(s,s);

if (result == null) {

result = s;

}

return result;

(74)

Fork/join

fork/join to implementacja interfejsu ExecutorService, która używa algorytmu work-stealing. Wątki robocze, które nie mają już nic do zrobienia mogą podkraść zadania z innych wątków, które są cały czas zajęte

klasa ForkJoinPool, rozszerzenie AbstractExecutorService, wykonuje zadania abstrakcyjnej klasy ForkJoinTask i implementuje algorytm work-stealing

Pakiet java.util.concurrent 74 / 102

(75)

Fork/join

fork – możliwość asynchronicznego wykonania (nie blokowanie wątku wywołującego fork)

join – oczekiwanie na zakończenie zadania

class Fibonacci extends RecursiveTask<Integer> { final int n;

Fibonacci(int n) { this.n = n; } Integer compute() {

if (n <= 1) return n;

Fibonacci f1 = new Fibonacci(n - 1);

f1.fork();

Fibonacci f2 = new Fibonacci(n - 2);

return f2.compute() + f1.join();

(76)

Fork/join

piszemy kod postaci

if (my portion of the work is small enough) do the work directly

else

split my work into two pieces

invoke the two pieces and wait for the results

kod ten umieszczamy w podklasie ForkJoinTask takiej jak

RecursiveTask (ze zwracaniem rezultatu) lub RecursiveAction (bez zwracania rezultatu)

w zadaniu wywołujemy invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2)

Pakiet java.util.concurrent 76 / 102

(77)

Fork/join

następnie tworzymy obiekt z całym zadaniem i przekazujemy go do metody invoke() instancji ForkJoinPool

przykład z zamazaniem rysunku, każdy piksel jest zastępowany średnią z sąsiadujących pikseli

public class ForkBlur extends RecursiveAction { private int[] mSource;

private int mStart;

private int mLength;

private int[] mDestination;

// Processing window size; should be odd.

private int mBlurWidth = 15;

public ForkBlur(int[] src, int start, int length, int[]

dst) {

mSource = src;

(78)

Fork/join

protected void computeDirectly() {

int sidePixels = (mBlurWidth - 1) / 2;

for (int index = mStart; index < mStart + mLength;

index++) {

// Calculate average.

float rt = 0, gt = 0, bt = 0;

for (int mi = -sidePixels; mi <= sidePixels; mi++) { int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1);

int pixel = mSource[mindex];

rt += (float)((pixel & 0x00ff0000) >> 16) / mBlurWidth;

gt += (float)((pixel & 0x0000ff00) >> 8) / mBlurWidth;

bt += (float)((pixel & 0x000000ff) >> 0) / mBlurWidth;

Pakiet java.util.concurrent} 78 / 102

(79)

Fork/join

// Reassemble destination pixel.

int dpixel = (0xff000000 ) | (((int)rt) << 16) |

(((int)gt) << 8) | (((int)bt) << 0);

mDestination[index] = dpixel;

} } ...

(80)

Fork/join

protected static int sThreshold = 100000;

protected void compute() { if (mLength < sThreshold) {

computeDirectly();

return;

}

int split = mLength / 2;

invokeAll(new ForkBlur(mSource, mStart, split, mDestination),

new ForkBlur(mSource, mStart + split, mLength - split, mDestination));

}

Pakiet java.util.concurrent 80 / 102

(81)

Fork/join

// source image pixels are in src // destination image pixels are in dst

ForkBlur fb = new ForkBlur(src, 0, src.length, dst);

ForkJoinPool pool = new ForkJoinPool();

pool.invoke(fb);

(82)

Fork/join

metoda parallelSort() w klasie Arrays używa fork/join metody z pakietu java.util.streams używają fork/join

alternatywnie do fork/join do asynchronicznego wykonania możemy użyć CompletableFuture, metoda supplyAsync

fork/join vs CompletableFuture

Pakiet java.util.concurrent 82 / 102

(83)

Uzupełnienia do przetwarzania współbieżnego

(84)

Zakleszczenie (ang. deadlock)

Rysunek 2:Źródło: wikipedia

Uzupełnienia do przetwarzania

współbieżnego 84 / 102

(85)

Zakleszczenie

zakleszczenieto sytuacja, w której co najmniej dwie różne akcje czekają na siebie nawzajem, więc żadna nie może się zakończyć co najmniej dwa wątki czekają na zakończenie innego z oczekujących wątków

każdy wątek posiada zasób wymagany przez inny wątek żaden nie może się zakończyć dopóki nie uzyska innego zasobu

(86)

Zakleszczenie cd.

public class Deadlock { static class Friend {

private final String name;

public Friend(String name) { this.name = name;

}

public String getName() { return this.name;

}

public synchronized void bow(Friend bower) {

System.out.format("%s: %s" + " has bowed to me!%n", this.name, bower.getName());

bower.bowBack(this);

}

Uzupełnienia do przetwarzania

współbieżnego 86 / 102

(87)

Zakleszczenie (ang. deadlock) cd.

public synchronized void bowBack(Friend bower) { System.out.format("%s: %s" + " has bowed back to me!%n",

this.name, bower.getName());

} }

public static void main(String[] args) {

final Friend alphonse = new Friend("Alphonse");

final Friend gaston = new Friend("Gaston");

new Thread(new Runnable() {

public void run() { alphonse.bow(gaston); } }).start();

new Thread(new Runnable() {

(88)

Zakleszczenie (ang. deadlock) cd.

mamy dwa monitory, dla obiektu alphonse i gaston

oba wątki zablokują się przy uruchamianiu metody bowBack

Uzupełnienia do przetwarzania

współbieżnego 88 / 102

(89)

Zagłodzenie (ang. starvation)

jeden wątek z grupy wątków nie może działać przez zbyt długi czas lub ciągle

przykładowo gdyby szybko działające wątki zawsze pierwsze przechodziły przez blokadę, to wolniej działające mogłyby być zagłodzone

nie możemy zakładać żadnej konkretnej kolejności wątków chcących zająć monitor

wątek nie może zakończyć działania, ponieważ nie ma dostępu do współdzielonego zasobu

współdzielony zasób może być przydzielany innym wątkom wątki o niższym priorytecie mogą być zagłodzone

(90)

Uwięzienie (ang. livelock)

Każdy wątek próbuje znaleźć jakąś drogę wyjścia. Wątki próbujące iść dalej, nie są w stanie tego zrobić przez nadmiernie długi czas lub też nigdy

Wątek odpowiada na działanie innego wątku. Jeśli akcja drugiego wątku jest także odpowiedzią na akcje pierwszego wątku, wtedy mamy do czynienia z uwięzieniem

wątki nie są blokowane, są zbyt zajęte odpowiadaniem do siebie, aby wykonywać się dalej

przykład: dwie osoby próbują przejść wąskim korytarzem i każda z nich próbuje przepuścić drugą przesuwając się w prawo lub lewo

Uzupełnienia do przetwarzania

współbieżnego 90 / 102

(91)

Thread-safe

program jestthread-safe (wątkowo bezpieczny), jeśli wykonuje się prawidłowo podczas dostępu wielowątkowego, to znaczy nie występują problemy związane z wielowątkowością takie jak data race,

zakleszczenie, zagłodzenie, livelock, itp.

(92)

Klasyczne problemy synchronizacji

Klasyczne problemy synchronizacji 92 / 102

(93)

Problem producenta i konsumenta

W problemie występują dwa rodzaje procesów: producenti

konsument, którzy dzielą wspólny zasób (zasób związany) - bufor dla produkowanych (konsumowanych) jednostek.

Zadaniem producenta jest wytworzenie produktu i umieszczenie go w buforze

Zadaniem konsumenta jest pobranie produktu z bufora

Zadanie polega na tym, aby producent nie dodawał nowych jednostek gdy bufor jest pełny, a konsument nie pobierał gdy bufor jest pusty producenci produkują wiele jednostek, konsumenci pobierają wiele jednostek

może być wiele producentów i konsumentów

producent musi sygnalizować po zapisaniu do kolejki, że kolejka nie jest pusta

(94)

Problem czytelników i pisarzy

w problemie występują dwa rodzaje procesów: czytelnicy– wszystkie procesy niedokonujące zmian w zasobie, pisarze– pozostałe procesy jednoczesny dostęp do zasobu może uzyskać dowolna liczba

czytelników, pisarz może otrzymać tylko dostęp wyłączny, czyli w tym czasie nie jest możliwe otrzymanie dostępu przez innych pisarzy ani czytelników

problemem jest przepustowość, gdy chcemy maksymalizować przepustowość to może dojść do zagłodzenia pisarzy, umożliwienie aktualizacji może zmniejszać przepustowość

jeśli zastosujemy strategię oczekiwania przez pisarzy dopóki nie będzie czytelnika, dojdzie do zagłodzenia, jeżeli pisarze będą działali zbyt często, spadnie przepustowość

należy znaleźć równowagę

Klasyczne problemy synchronizacji 94 / 102

(95)

Problem ucztujących filozofów

Rysunek 3:Źródło: wikipedia

(96)

Problem ucztujących filozofów

Pięciu filozofów siedzi przy stole i każdy wykonuje jedną z dwóch czynności – albo jegdy jest głodny, alborozmyśla. Każda osoba ma przy sobie widelec po lewej i po prawej stronie. Zakłada się, że zjedzenie potrawy wymaga dwóch widelców. Zakładamy

nieograniczoną ilość jedzenia. Zadanie polega na zaprojektowaniu dyscypliny jedzenia tak aby żaden z nich nie głodował (tzn. każdy z filozofów będzie na przemian jadł i rozmyślał).

filozofowie to wątki, widelce to zasoby, procesy rywalizują o zasoby może dojść dozakleszczenia, kiedy każdy z filozofów zabierze lewy widelec, i będzie czekał na prawy, uwięzienia i spadku przepustowości rozwiązanie, filozof próbuje wziąść dwa widelce, jeśli jeden z nich jest zajęty, to czeka 10 minut na drugi widelec, następnie odkłada on drugi widelec i czeka 10 minut, eliminuje to zakleszczenie, ale może pojawić się uwięzienie, kiedy wszyscy w tym samym czasie wezmą lewy widelec, po 10 minutach odłożą, po kolejnych 10 minutach znowu wezmą lewe widelce

Klasyczne problemy synchronizacji 96 / 102

(97)

Pytania

Czy można wywołując metodę Lock#unlock() zwolnić blokadę założoną przez inny wątek?

(98)

Pytania

Zazwyczaj nie. Dla implementacji ReentrantLock jest rzucany wyjątek IllegalMonitorStateException, jeśli aktualny wątek nie posiada

blokady.

Klasyczne problemy synchronizacji 98 / 102

(99)

Pytania

Jaka jest różnica między ConcurrentHashMap i SynchronizedHashMap?

(100)

Pytania

Ta druga struktura używa blokady na poziomie obiektu, ta pierwsza na niższym poziomie. Ta pierwsza umożliwia modyfikację struktury listy w trakcie odczytu elementów przez inny wątek.

Klasyczne problemy synchronizacji 100 / 102

(101)

Pytania

Czy flaga używana przy wait/notify musi być atomowa?

(102)

Pytania

Nie ma potrzeby, ponieważ zarówno przy wait jak i notify operacje na fladze są wewnątrz bloku synchronizowanego. Poza tym w Javie typ prosty boolean jest i tak już atomowy.

Klasyczne problemy synchronizacji 102 / 102

Cytaty

Powiązane dokumenty

Podstawą procesu edukacyjnego jest komunikacja w relacji nauczyciel – – student i to ona będzie przedmiotem dalszych rozważań, uporządkowa- nych za pomocą metafory

W najwyżej ce- nionych periodykach naukowych udział publikacji odnoszących się do ewolucji i historii świata żywe- go wciąż jest nieproporcjonalnie większy niż udział

niepełnosprawnych ruchowo realizujących jakąś pasję, np. Miniparaolimpiada: zabawy ruchowe „Kto pierwszy?”. 1) Dzieci siadają na dywanie tyłem do mety. Ich zadaniem

Program powinien umożliwiać ustawienie ile razy każdy z filozofów będzie jadł i rozmyślał, liczbę filozofów, maksymalny czas jedzenia za każdym razem, maksymalny czas

• Szczególna teoria względności to współczesna teoria czasu i przestrzeni stanowiąca podstawę opisu zjawisk fizycznych przy dowolnych prędkościach badanych obiektów wówczas,

ASOCJACJA KWALIFIKOWANA – ASOCJACJA Z KWALIFIKATOREM (ZBIOREM ATRYBUTÓW) POZWALA WSKAZAĆ, KTÓRY ATRYBUT JEDNEJ Z KLAS SŁUŻY DO ZAPEWNIENIA UNIKATOWOŚCI ZWIĄZKU (JEST

Z perspektywy Cwietajewej istotna pozostaje paralela między poetą a  „murzynem” i inny związek, w  którym bycie poetą równoważne jest z  byciem „Żydem” (do tej

osoby relatora 1/3 - inne materiały dokumentacyjne