Wielowątkowość 3
Marcin Orchel
AGH University of Science and Technology in Poland
Agenda
Agenda 2 / 102
1 Pakiet java.util.concurrent
2 Uzupełnienia do przetwarzania współbieżnego
3 Klasyczne problemy synchronizacji
Pakiet java.util.concurrent
Pakiet java.util.concurrent 4 / 102
java.util.concurrent
JDK
Java
Language Java Language
Tools &
Tool APIs
java javac javadoc jar javap jdeps Scripting Security Monitoring JConsole VisualVM JMC JFR
JPDA JVM TI IDL RMI Java
DB Deployment Internationalization Web Services Troubleshooting
JRE
Deployment Java Web Start Applet / Java Plug-in
User Interface Toolkits
JavaFX
Swing Java 2D AWT Accessibility
Drag and Drop Input Methods Image I/O Print Service Sound
Java SE API Integration
Libraries IDL JDBC JNDI RMI RMI-IIOP Scripting
Compact Profiles Other Base
Libraries
Beans Security Serialization Extension Mechanism JMX XML JAXP Networking Override Mechanism JNI Date and Time Input/Output Internationalization
lang and util Base Libraries
lang and util
Math Collections Ref Objects Regular Expressions Logging Management Instrumentation Concurrency Utilities
java.util.concurrent.atomic
Co oznacza operacja atomowa? To oznacza, że nie jest możliwe uzyskanie rezultatu pośredniego operacji. Inna definicja operacje atomowe to operacje podstawowe wolne od wyścigu do danych.
Które operacje są atomowe wg specyfikacji Java?
Akcje elementarne, które dodają wątki do i usuwają wątki ze zbioru oczekującego są atomowe.
W modelu teoretycznym wykonania spójnego sekwencyjnie, każda indywidualna akcja jest atomiczna.
zapis i odczyt zmiennych volatile long lub double są zawsze atomiczne, dla zmiennych long lub double nie będących volatile nie ma takiej gwarancji
zapis i odczyt referencji jest zawsze atomowy
Czy zapis i odczytintjest atomowy? Nie ma wprost informacji o tym w specyfikacji Java.
klasy AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference metoda
boolean compareAndSet(expectedValue, updateValue);
Pakiet java.util.concurrent 6 / 102
java.util.concurrent.atomic
weakCompareAndSet może zwrócić false czasami bez wyraźnej przyczyny, a więc nawet wtedy gdy wartość w pamięci zgadza się z expectedValue, oraz nie gwarantuje porządku pojawienia-przed, a więc kiedy wątek widzi uaktualnienie tej zmiennej spowodowane tą
metodą, to nie ma gwarancji, że widzi uaktualnienia także innych zmiennych, które były uaktualnione przed weakCompareAndSet.
import java.util.concurrent.atomic.AtomicInteger;
class AtomicCounter {
private AtomicInteger c = new AtomicInteger(0);
public void increment() { c.incrementAndGet();
}
public void decrement() { c.decrementAndGet();
}
Atomiczność w C++
Typ atomowy jest specjalizacją szablonu atomic.
operacja is_lock_free() - sprawdzenie czy operacje są wolne od blokad
Inicjalizacja zmiennej atomowej nie jest operacją atomową.
Pakiet java.util.concurrent 8 / 102
Atomiczność w C++
najczęściej operacje atomiczne są zaimplementowane bez blokad, operacja is_lock_free() służy do sprawdzania czy operacje są wolne od blokad
inicjalizacja zmiennej atomowej nie jest operacją atomową, dlatego należy zadbać o to, aby była prosta
Atomiczność w C++
std::atomic<int> ai(0); //inicjalizacja ai wartością 0 ai = 10; // atomowe ustawienie ai wartości 10
std::cout << ai; //atomowe odczytanie wartości ai ++ai; //atomowe zwiększenie ai do 11
--ai; //atomowe zmniejszenie ai do 10
Pakiet java.util.concurrent 10 / 102
Atomiczność w C++
std::atomic<bool> valAvailable(false);
auto imptValue = computeImportantValue(); //obliczanie wartości
valAvailable = true; //informowanie innego zadania, że wartość jest dostępna
czy może być zmieniona kolejność instrukcji 2 i 3? Kompilatory widzą te instrukcje jako a = b oraz x = y , a więc są niezależne. Więc kompilator mógłby zamienić te instrukcje miejscami. Jednakże domyślnym modelem pamięci w C++ dla obiektów std::atomic jest spójność sekwencyjna, a więc kolejność wszystkich instrukcji musi być zachowana, więc nie może być ta optymalizacja zrobiona.
zachowanie sekwencyjnej spójności w C++ oznacza, że kompilatory generują kod. które zapewnia, że zrobi to także podległy sprzęt.
Atomiczność w C++
możliwość użycia poleceń load i store
std::atomic<int> y(x.load()); //odczyt x y.store(x.load()); //ponowny odczyt x
Pakiet java.util.concurrent 12 / 102
Executors, współbieżność zadaniowa
odseparowaniezarządzania wątkami od reszty aplikacji obiekty, które realizują powyższe towykonawcy
dostępne są 3 interfejsyExecutor– interfejs do uruchamiania nowych zadań, ExecutorService – podinterfejs Executor z dodatkowymi funkcjami pomagającymi w zarządzaniu cyklem życia indywidualnych zadań i egzekutora, ScheduledExecutorService – podinterfejs ExecutorService wspierający przyszłe i okresowe wykonywanie zadań zamiast (new Thread(r)).start(); piszemy e.execute(r);
nowy kod może utworzyć nowy wątek i uruchomić zadanie lub użyć istniejącegowątku roboczego, albo umieścić r w kolejce do
oczekiwania na zwolnienie wątku roboczego
W klasie ExecutorService mamy dodatkowo dostępną metodę submit, która akceptuje obiekty typu Runnable, a także Callable
Executors
istnieją również metody w ExecutorService umożliwiające podanie większej liczby zadań do wykonania
istnieją również metody do zarządzania zamknięciem wykonawcy W ScheduledExecutorService jest metoda schedule, która wykonuje zadanie z określonym opóźnieniem, ponadto istnieją metody scheduleAtFixedRate, oraz scheduleWithFixedDelay,
uruchamiające zadania wielokrotnie lub ze zdefiniowanymi odstępami.
relacja pojawienia-przed: wszystkie akcje w wątku przed podaniem zadania do ExecutorService pojawiają-się-przed akcjami z tego zadania
Pakiet java.util.concurrent 14 / 102
Executors
utworzenie wykonawcy z jednym wątkiem ExecutorService executor = Executors.newSingleThreadExecutor();
przesłanie obiektu Runnable executor.execute(runnable);
zakończenie działania obiektu wykonującego executor.shutdown() inne metody to invokeAny, invokeAll, awaitTermination, pobranie wyników po zakończeniu za pomocą
ExecutorCompletionService
wykonawca z dowolną liczbą wątków w puli wątków Executors.newCachedThreadPool
zagwarantowanie stałej liczby wątków w puli wątków Executors.newFixedThreadPool
ExecutorService
class NetworkService implements Runnable { private final ServerSocket serverSocket;
private final ExecutorService pool;
public NetworkService(int port, int poolSize) throws IOException {
serverSocket = new ServerSocket(port);
pool = Executors.newFixedThreadPool(poolSize);
}
public void run() { // run the service try {
for (;;) {
pool.execute(new Handler(serverSocket.accept()));
}
} catch (IOException ex) { pool.shutdown();
} } }
Pakiet java.util.concurrent 16 / 102
ExecutorService
class Handler implements Runnable { private final Socket socket;
Handler(Socket socket) { this.socket = socket; } public void run() {
// read and service request on socket }
}
ExecutorService
void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being
submitted try {
// Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks, [zatrzymanie wykonania wątków oczekujących]
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
} }
Pakiet java.util.concurrent 18 / 102
ExecutorService - własna implementacja
public final class GlobalExecutorService {
private static final Logger logger =
LoggerFactory.getLogger(GlobalExecutorService.class);
public static final int THREADS_COUNT = Runtime.getRuntime().availableProcessors();
private static final ExecutorService singleThreadExecutorService;
private static final ExecutorService executorService;
static {
singleThreadExecutorService = Executors.newFixedThreadPool(1);
ExecutorService - własna implementacja
private GlobalExecutorService() { assert false; }
public static void shutdown() {
GlobalExecutorService.shutdownAndAwaitTermination(singleThreadExecutorService);
GlobalExecutorService.shutdownAndAwaitTermination(executorService);
}
public static ExecutorService getSingleThreadExecutorService() {
return singleThreadExecutorService; }
public static ExecutorService getExecutorService() {
return executorService; }
Pakiet java.util.concurrent 20 / 102
ExecutorService - własna implementacja
public static void
shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown();
try {
if (!pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
pool.shutdownNow();
if (!pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
logger.error("Pool did not terminate");
} }
} catch (InterruptedException ie) {
Klasa Future
relacja pojawienia-przed: wszystkie akcje w wątku przed podaniem zadania pojawiają-się-przed zwróceniem rezultatu z pomocą Future.get()
Pakiet java.util.concurrent 22 / 102
Klasa Future
interface ArchiveSearcher { String search(String target); } class App {
ExecutorService executor = ...
ArchiveSearcher searcher = ...
void showSearch(final String target) throws InterruptedException {
Future<String> future
= executor.submit(new Callable<String>() { public String call() {
return searcher.search(target);
}});
displayOtherThings(); // do other things while searching
try {
Klasa FutureTask
klasa FutureTask to implementacja interfejsu Future, umożliwia ona uruchomienie pojedynczego zadania w osobnym wątku umożliwia podanie obiektu Callable do metody execute FutureTask<String> future =
new FutureTask<String>(new Callable<String>() { public String call() {
return searcher.search(target);
}});
executor.execute(future);
Pakiet java.util.concurrent 24 / 102
Interfejs CompletionService
możliwość przetwarzania rezultatów zadań w kolejności ich zakończenia
wgrane zadania po zakończeniu są umieszczane w kolejce, do której mamy dostęp za pomocą take()
akcje w wątku przed dodaniem zadania do CompletionService pojawiają-się-przedakcjami z zadania, które pojawiają-się-przed akcjamipo pomyślnym powrocie z take()
Interfejs CompletionService
void solve(Executor e,
Collection<Callable<Result>> solvers)
throws InterruptedException, ExecutionException { CompletionService<Result> ecs
= new ExecutorCompletionService<Result>(e);
for (Callable<Result> s : solvers) ecs.submit(s);
int n = solvers.size();
for (int i = 0; i < n; ++i) { Result r = ecs.take().get();
if (r != null) use(r);
} }
Pakiet java.util.concurrent 26 / 102
Alternatywa do Executors
użycie klasy java.util.Timer oraz java.util.TimerTask, zostało zastąpione ScheduledThreadPoolExecutor
Zadania w C++
mamy parę obiektówfuturei promiseoraz stan współdzielony, zadanie wstawia wynik do obiektu promise, a inne zadanie pobiera wynik z obiektu future. Stan współdzielony zawiera: wartość, bit gotowości (czy wartość jest gotowa do pobrania), zadanie, licznik użyć, dane dotyczące wzajemnego wykluczania,
typ promise to uchwyt do współdzielonego stanu, zadanie zapisuje wynik (set_value(x))
typ packaged_task przechowuje zadanie i parę future-promise klasa future to uchwyt do współdzielonego stanu, zadanie może pobierać wyniki wstawione przez promise
typ shared_future umożliwia odczytanie wyniku future więcej niż jeden raz
async asynchroniczny starter wątków, starter podejmuje decyzje czy uruchomić nowy wątek, dostępne są dwie zasady uruchamiania:
std::launch::async (wykonuje zadanie uruchamiając nowy wątek lub za pomocą istniejącego) i std::launch::deferred (wykonuje zadanie przy wywołaniu get)
Pakiet java.util.concurrent 28 / 102
Zadania w C++
w podejściu wątkowym mamy
int doAsyncWork(); std::thread t(doAsyncWork);
w podejściu zadaniowym auto fut = std::async(doAsyncWork);, obiektem przekazywanym jest zadanie, zwracany jest obiekt future domyślna zasada uruchamiania wątków jest sumą dwóch
wspomnianych zasad async i deferred, a więc jedna z nich zostanie wybrana przez implementację biblioteki
auto fut1 = std::async(f);, oraz
auto fut2=std::async(std::launch::async | std::launch::deferred, f);
Klasa CompletableFuture
jest to odpowiednik obiektu promisez c++, różnica jest taka, że CompletableFuture implementuje Future, a więc nie ma podziału na future i promise
możemy zakończyć działanie ustawiając rezultat, metoda complete(T value), rezultat będzie zwrócony przez get
CompletableFuture implementuje CompletionStage
Pakiet java.util.concurrent 30 / 102
Alternatywa dla wait/notify
jeśli obiekt CompletableFuture posiada możliwość ustawiania i pobierania rezultatu wraz z oczekiwaniem i powiadamianiem, to możemy użyć tego mechanizmu zamiast wait/notify
przykład dla c++
Alternatywa dla wait/notify
std::promise<void> p; //obiekt promise do kanału komunikacyjnego
... //wykrywanie zdarzenia
p.set_value(); //informowanie zdarzenia reagującego
Pakiet java.util.concurrent 32 / 102
Alternatywa dla wait/notify
... //przygotowanie do reakcji
p.get_future().wait(); //oczekiwanie na obiekt future odpowiadający obiektowi p
... //reakcja na zdarzenie
Alternatywa dla wait/notify
porównanie z wait/notify: nie wymagany jest muteks
działa w przypadku gdy zadanie wykrywające ustawi swój obiekt std::promise przed oczekiwaniem (wait), (wait/notify również tak działa ponieważ jest sprawdzana flaga)
w Javie używamy metody get na Future, metoda ta jest odporna na fałszywe wybudzenia
wada: obiekt std::promise może być ustawiony tylko raz
Pakiet java.util.concurrent 34 / 102
Klasa Lock
pakiet java.util.concurrent.locks interfejs Lock
Lock vs synchronized
podobnie jak w mechanizmie synchronized tylko jeden wątek może zajmować blokadę w danym czasie, obiektyLock wspierają również mechanizm wait/notify poprzez obiekty Condition
Podstawową zaletą obiektów Lock jest możliwość dalszego
wykonywania wątku, jeśli nie jest możliwe zajęcie blokady. Metoda tryLock kończy się natychmiast lub po ustalonym czasie, kiedy blokada nie jest dostępna. Metoda lockInterruptibly kończy się, kiedy inny wątek wyśle sygnał przerwania przed zajęciem blokady do oczekującego wątku.
Przykładową implementacją jest ReentrantLock. Co oznacza
Reentrant? Wielobieżność. Oznacza to, że wątek może zająć blokadę wielokrotnie bez blokowania siebie. Gdyby blokada nie była reentrant, wtedy wątek zostałby zablokowany przy drugiej próbie zajęcia blokady.
Pakiet java.util.concurrent 36 / 102
Lock vs synchronized
Konstruktor tej klasy posiada dodatkowy parametr fair, który faworyzuje dostęp dla najdłużej czekającego wątku. W przeciwnym wypadku nie ma żadnych gwarancji odnośnie kolejności dostępu.
Uwaga: parametr ma wpływ na zajmowanie blokady, natomiast nie ma bezpośredniego wpływu na czas procesora zajmowany przez wątek.
Lock vs synchronized
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Random;
public class Safelock { static class Friend {
private final String name;
private final Lock lock = new ReentrantLock();
public Friend(String name) { this.name = name;
}
public String getName() { return this.name;
}
Pakiet java.util.concurrent 38 / 102
Lock vs synchronized
public boolean impendingBow(Friend bower) { Boolean myLock = false;
Boolean yourLock = false;
try {
myLock = lock.tryLock();
yourLock = bower.lock.tryLock();
} finally {
if (! (myLock && yourLock)) { if (myLock) {
lock.unlock();
}
if (yourLock) {
bower.lock.unlock();
}
Lock vs synchronized
public void bow(Friend bower) { if (impendingBow(bower)) {
try {
System.out.format("%s: %s has"
+ " bowed to me!%n",
this.name, bower.getName());
bower.bowBack(this);
} finally { lock.unlock();
bower.lock.unlock();
} } else {
System.out.format("%s: %s started"
+ " to bow to me, but saw that"
+ " I was already bowing to him.%n", this.name, bower.getName());
} }
Pakiet java.util.concurrent 40 / 102
Lock vs synchronized
public void bowBack(Friend bower) { System.out.format("%s: %s has" +
" bowed back to me!%n", this.name, bower.getName());
} }
static class BowLoop implements Runnable { private Friend bower;
private Friend bowee;
public BowLoop(Friend bower, Friend bowee) { this.bower = bower;
Lock vs synchronized
public void run() {
Random random = new Random();
for (;;) { try {
Thread.sleep(random.nextInt(10));
} catch (InterruptedException e) {}
bowee.bow(bower);
} } }
public static void main(String[] args) {
final Friend alphonse = new Friend("Alphonse");
final Friend gaston = new Friend("Gaston");
new Thread(new BowLoop(alphonse, gaston)).start();
new Thread(new BowLoop(gaston, alphonse)).start();
}
} Pakiet java.util.concurrent 42 / 102
Condition
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException { lock.lock();
try {
while (count == items.length) notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
Condition
public Object take() throws InterruptedException { lock.lock();
try {
while (count == 0) notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally { lock.unlock();
} } }
Pakiet java.util.concurrent 44 / 102
Condition vs wait/notify
możemy mieć wiele obiektów Condition, dla jednego zajęcia blokady, w mechanizmie wait/notify musimy zająć blokady na obu tych obiektach
dwie dodatkowe metody awaitUntil(Date deadline), awaitUninterruptibly()
Semafor
semaforsłuży do kontrolowania liczby wątków mających dostęp do pewnych zasobów
semaforto zbiór pozwoleń, każde acquire() jest blokowane dopóki pozwolenie nie będzie dostępne, i następnie jest ono zabierane. Każde release() dodaje pozwolenie. Nie ma obiektów związanych z pozwoleniami, jest tylko licznik.
monitor vs semafor: monitor jest zwalniany przez wątek, który go zajął, semafor może być zwalniany przez dowolny wątek, może to być użyteczne do wyjścia z zakleszczenia
binarny semafor kiedy zbiór pozwoleń jest jednoelementowy akcje w wątku przed wywołaniem release pojawiają-się-przed akcjami następującymi po pomyślnym acquire w innym wątku
Pakiet java.util.concurrent 46 / 102
Semafor
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException { available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) { if (markAsUnused(x))
available.release();
}
// Not a particularly efficient data structure; just for
Semafor
protected synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) { used[i] = true;
return items[i]; }}
return null; // not reached }
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) {
if (used[i]) { used[i] = false;
return true;
} else
return false; }}
return false;
} }
Pakiet java.util.concurrent 48 / 102
Binarny Semafor vs Lock
Lock musi być zwalniany przez wątek, który go zajął, semafor niekoniecznie. Z tego wynika, że dla semafora może być wywołane release przez wątek bez wcześniejszego wywołania acquire.
LockSupport
działa podobnie jak binarny semafor metody park i unpark
posiada metodę parkNanos(long nanos) umożliwiającą oczekiwanie na pozwolenie maksymalnie przez określony czas
Pakiet java.util.concurrent 50 / 102
Blokady w C++
Muteks - obiekt typu mutex, wyróżniamy klasy mutex, recursive_mutex, timed_mutex, recursive_timed_mutex
muteks posiada m.in. metody lock() i try_lock(), unlock(), przy wywołaniu unlock wątekpowinienzajmować mutex, a więc nie jest to wymagane, czyli mutex zachowuje się podobnie do binarnego semafora Może być taka sytuacja, że po wywołaniu unlock(), inny wątek B zajmie ten sam mutex, zwolni go i zniszczy, przed tym jak wątek A powróci z metody unlock()
jeśli mutex wywoła lock ponownie dojdzie do zakleszczenia wątku ze sobą, rozwiązanie recursive_mutex może być wielokrotnie zajmowany przez jeden wątek, tak jak w Javie ReentrantLock
timed_mutex - udostępnia operacje pozwalające zająć mutex tylko
Blokady w C++
co jeśli nie wywołamy unlock()? Możemy użyć lock_guard lub unique_lock, które automatycznie wywołają unlock() w destruktorze blokada (lock) to obiekt, który trzyma referencję do blokowalnego obiektu, i może odblokować ten obiekt w destruktorze, lock_guard i unique_lock to strażnicy do obiektów, które można zablokować i odblokować (lockable object, np. mutex)
unique_lock udostępnia ponadto operacje na muteksie shared_lock umożliwia transfer blokady do innego obiektu shared_lock
void use(mutex& mtx, vector<string>& vs, int i) {
lock_guard<mutex> g (mtx);
if (i<0) return;
string s = vs[i];
...
}
Pakiet java.util.concurrent 52 / 102
Blokady w C++
wiele blokad, funkcjavoid lock(L1&, L2&, L3&...) umożliwia zajęcie blokad w kolejności i tym samym zapobiega zakleszczeniu call_once -void call_once(once_flag& flag, Callable&&
func, Args&&... args) - umożliwia uruchomienie jednorazowe podanej funkcji
Blokady w C++
odpowiednikiem wait/notify i Condition w C++ jest
condition_variable z metodami notify_one(), notify_all(), wait(lck) możemy użyć wait(lck, pred)
void wait(unique_lock<mutex>& lock, Predicate pred); i nie musimy pisać pętli zabezpieczającej dla wait przed
niespodziewanym wybudzeniem
Pakiet java.util.concurrent 54 / 102
Blokady w C++
std::condition_variable cv;
std::mutex_ m;
bool flag(false);
...//wykrywanie zdarzenia {
std::lock_guard<std::mutex> g(m); //blokowanie m przez konstruktor g
flag = true; //informowanie zadania reagującego } //odblokowanie m przez destruktor g
cv.notify_one(); //informowanie zdarzenia reagującego
Blokady w C++
... //przygotowanie do reakcji {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [] {return flag; }) //użycie lambda w celu uniknięcia fałszywych wybudzeń
... //reakcja na zdarzenie //m jest zablokowany
}
... //kontynuacja reakcji //teraz m jest odblokowany
Pakiet java.util.concurrent 56 / 102
ReadWriteLock
interfejs ReadWriteLock posiada dwie blokady, jedna dla operacji tylko odczytu, a druga dla zapisu. Blokada odczytu może być zajęta przez wiele wątków, jeśli nie ma wątków zapisujących. Blokada zapisu jest blokadą wykluczającą.
klasa implementująca ReentrantReadWriteLock
ReadWriteLock
class RWDictionary {
private final Map<String, Data> m = new TreeMap<String, Data>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public Data get(String key) { r.lock();
try { return m.get(key); } finally { r.unlock(); } }
public String[] allKeys() { r.lock();
try { return m.keySet().toArray(); } finally { r.unlock(); }
}
Pakiet java.util.concurrent 58 / 102
ReadWriteLock
public Data put(String key, Data value) { w.lock();
try { return m.put(key, value); } finally { w.unlock(); }
}
public void clear() { w.lock();
try { m.clear(); } finally { w.unlock(); } }
}
ReadWriteLock w C++
shared_timed_mutex - muteks czasowy, ale jest możliwość zajęcia mutexu przez wiele wątków (lock_shared()), dostępna jest również operacja lock(), kiedy agent wykonawczy posiada wyłączny dostęp (lock), to nie może być w tym samym czasie innego agenta z zajętym shared lock. Odpowiada to semantyce wielu czytelników i wyłącznego dostępu przez pisarza.
Pakiet java.util.concurrent 60 / 102
CountDownLatch
zatrzask odliczający pozwala wątkom na oczekiwanie aż zbiór operacji wykonywanych przez inne wątki zakończy się
te pierwsze z nich wywołują metodę await, a druga grupa wątków wywołuje metodę countDown()
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads new Thread(new Worker(startSignal,
doneSignal)).start();
doSomethingElse(); // don’t let run yet startSignal.countDown(); // let all threads proceed
CountDownLatch
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() { try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... } }
Pakiet java.util.concurrent 62 / 102
CyclicBarrier
pozwala wątkom na oczekiwanie aż każdyz nich osiągnie pewien punkt, bariera może być ponownie użyta kiedy wątki oczekujące zostaną zwolnione
barrier action jest wykonywana po tym jak ostatni wątek dojdzie do bariery, ale przed zwolnieniem wątków
akcje w wątku przed wywołaniem await()pojawiają-się-przedakcjami z barrier action, które to pojawiają-się-przed akcjami po pomyślnym powrocie z await() w innych wątkach
class Solver { final int N;
final float[][] data;
CyclicBarrier
class Worker implements Runnable { int myRow;
Worker(int row) { myRow = row; } public void run() {
while (!done()) { processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) { return;
} catch (BrokenBarrierException ex) { return;
} } } }
Pakiet java.util.concurrent 64 / 102
CyclicBarrier
public Solver(float[][] matrix) { data = matrix;
N = matrix.length;
Runnable barrierAction =
new Runnable() { public void run() { mergeRows(...);
}};
barrier = new CyclicBarrier(N, barrierAction);
List<Thread> threads = new ArrayList<Thread>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}
// wait until done
CyclicBarrier vs CountDownLatch
w CyclicBarrier zdefiniowana jest akcja, która zostanie wykonana jeśli ostatni wątek dojdzie do bariery (do await())
w CyclicBarrier mamy grupę wątków czekających, a w
CountDownLatch mamy grupę wątków czekających i drugą grupę wykonującą countDown()
ta druga grupa wątków w CountDownLatch nie jest blokowana po wywołaniu countDown()
w CyclicBarrier podajemy minimalną liczbę wątków czekających, które muszą dojść do bariery, natomiast w CountDownLatch nie podajemy liczby wątków czekających, a liczbę wątków wywołujących
countDown().
CyclicBarrier posiada metodę reset()
Pakiet java.util.concurrent 66 / 102
BlockingQueue
dodatkowe struktury doJava Collections Framework
BlockingQueue to kolejka, która dodatkowo wspiera operację oczekiwania, aż kolejka nie będzie pusta w trakcie pobierania elementu, i oczekiwania na miejsce w kolejce podczas dodawania elementu
metody blokujące to put(e) oraz take()
akcje w wątku przed dodaniem elementu do BlockingQueue
pojawiają-się-przedkolejnymi akcjami po dostępie lub usunięciu tego elementu z tej kolejki w innym wątku
BlockingQueue
class Producer implements Runnable { private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; } public void run() {
try {
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... } }
Pakiet java.util.concurrent 68 / 102
BlockingQueue
class Consumer implements Runnable { private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; } public void run() {
try {
while (true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... } }
BlockingQueue
class Setup { void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
} }
Pakiet java.util.concurrent 70 / 102
ConcurrentMap - kolekcje współbieżne
mapa gwarantująca bezpieczeństwo wątków (ang. thread-safe) i atomiczność. Dodatkowe metody (atomiczne) w stosunku do Map to przykładowo remove(Object key, Object value) – usunięcie elementu dla podanego klucza, tylko jeśli mapuje do podanej wartości,
replace(K key, V value) – zastąpienie elementu dla podanego klucza, tylko jeśli mapuje do podanej wartości, putIfAbsent(K key, V value) - jeśli mapa nie zawiera klucza to jest dodany klucz z podaną
wartością, jeśli zawiera to jest zwracana wartość aktualna bezpieczeństwo wątków oznacza, że możemy tej mapy używać w dostępiewielowątkowym
przykładowa implementacja to ConcurrentHashMap. Iteratory nie wyrzucają wyjątku ConcurrentModificationException
akcje w wątku przed dodaniem elementu do ConcurrentMap jako klucz lub wartośćpojawiają-się-przed akcjami następującymi po
ConcurrentMap - kolekcje współbieżne
private static final ConcurrentMap<String, String> map = new ConcurrentHashMap<String, String>();
public static String intern(String s) {
String previousValue = map.putIfAbsent(s,s);
return previousValue == null ? s : previousValue;
}
Pakiet java.util.concurrent 72 / 102
ConcurrentMap - kolekcje współbieżne
zoptymalizowana wersja
warto stosować kolekcje współbieżne zamiast kolekcji zewnętrznie synchronizowanych (Collections.synchronizedMap lub Hashtable) public static String intern(String s) {
String result = map.get(s);
if (result == null) {
result == map.putIfAbsent(s,s);
if (result == null) {
result = s;
}
return result;
Fork/join
fork/join to implementacja interfejsu ExecutorService, która używa algorytmu work-stealing. Wątki robocze, które nie mają już nic do zrobienia mogą podkraść zadania z innych wątków, które są cały czas zajęte
klasa ForkJoinPool, rozszerzenie AbstractExecutorService, wykonuje zadania abstrakcyjnej klasy ForkJoinTask i implementuje algorytm work-stealing
Pakiet java.util.concurrent 74 / 102
Fork/join
fork – możliwość asynchronicznego wykonania (nie blokowanie wątku wywołującego fork)
join – oczekiwanie na zakończenie zadania
class Fibonacci extends RecursiveTask<Integer> { final int n;
Fibonacci(int n) { this.n = n; } Integer compute() {
if (n <= 1) return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
Fork/join
piszemy kod postaci
if (my portion of the work is small enough) do the work directly
else
split my work into two pieces
invoke the two pieces and wait for the results
kod ten umieszczamy w podklasie ForkJoinTask takiej jak
RecursiveTask (ze zwracaniem rezultatu) lub RecursiveAction (bez zwracania rezultatu)
w zadaniu wywołujemy invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2)
Pakiet java.util.concurrent 76 / 102
Fork/join
następnie tworzymy obiekt z całym zadaniem i przekazujemy go do metody invoke() instancji ForkJoinPool
przykład z zamazaniem rysunku, każdy piksel jest zastępowany średnią z sąsiadujących pikseli
public class ForkBlur extends RecursiveAction { private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination;
// Processing window size; should be odd.
private int mBlurWidth = 15;
public ForkBlur(int[] src, int start, int length, int[]
dst) {
mSource = src;
Fork/join
protected void computeDirectly() {
int sidePixels = (mBlurWidth - 1) / 2;
for (int index = mStart; index < mStart + mLength;
index++) {
// Calculate average.
float rt = 0, gt = 0, bt = 0;
for (int mi = -sidePixels; mi <= sidePixels; mi++) { int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1);
int pixel = mSource[mindex];
rt += (float)((pixel & 0x00ff0000) >> 16) / mBlurWidth;
gt += (float)((pixel & 0x0000ff00) >> 8) / mBlurWidth;
bt += (float)((pixel & 0x000000ff) >> 0) / mBlurWidth;
Pakiet java.util.concurrent} 78 / 102
Fork/join
// Reassemble destination pixel.
int dpixel = (0xff000000 ) | (((int)rt) << 16) |
(((int)gt) << 8) | (((int)bt) << 0);
mDestination[index] = dpixel;
} } ...
Fork/join
protected static int sThreshold = 100000;
protected void compute() { if (mLength < sThreshold) {
computeDirectly();
return;
}
int split = mLength / 2;
invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
new ForkBlur(mSource, mStart + split, mLength - split, mDestination));
}
Pakiet java.util.concurrent 80 / 102
Fork/join
// source image pixels are in src // destination image pixels are in dst
ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(fb);
Fork/join
metoda parallelSort() w klasie Arrays używa fork/join metody z pakietu java.util.streams używają fork/join
alternatywnie do fork/join do asynchronicznego wykonania możemy użyć CompletableFuture, metoda supplyAsync
fork/join vs CompletableFuture
Pakiet java.util.concurrent 82 / 102
Uzupełnienia do przetwarzania współbieżnego
Zakleszczenie (ang. deadlock)
Rysunek 2:Źródło: wikipedia
Uzupełnienia do przetwarzania
współbieżnego 84 / 102
Zakleszczenie
zakleszczenieto sytuacja, w której co najmniej dwie różne akcje czekają na siebie nawzajem, więc żadna nie może się zakończyć co najmniej dwa wątki czekają na zakończenie innego z oczekujących wątków
każdy wątek posiada zasób wymagany przez inny wątek żaden nie może się zakończyć dopóki nie uzyska innego zasobu
Zakleszczenie cd.
public class Deadlock { static class Friend {
private final String name;
public Friend(String name) { this.name = name;
}
public String getName() { return this.name;
}
public synchronized void bow(Friend bower) {
System.out.format("%s: %s" + " has bowed to me!%n", this.name, bower.getName());
bower.bowBack(this);
}
Uzupełnienia do przetwarzania
współbieżnego 86 / 102
Zakleszczenie (ang. deadlock) cd.
public synchronized void bowBack(Friend bower) { System.out.format("%s: %s" + " has bowed back to me!%n",
this.name, bower.getName());
} }
public static void main(String[] args) {
final Friend alphonse = new Friend("Alphonse");
final Friend gaston = new Friend("Gaston");
new Thread(new Runnable() {
public void run() { alphonse.bow(gaston); } }).start();
new Thread(new Runnable() {
Zakleszczenie (ang. deadlock) cd.
mamy dwa monitory, dla obiektu alphonse i gaston
oba wątki zablokują się przy uruchamianiu metody bowBack
Uzupełnienia do przetwarzania
współbieżnego 88 / 102
Zagłodzenie (ang. starvation)
jeden wątek z grupy wątków nie może działać przez zbyt długi czas lub ciągle
przykładowo gdyby szybko działające wątki zawsze pierwsze przechodziły przez blokadę, to wolniej działające mogłyby być zagłodzone
nie możemy zakładać żadnej konkretnej kolejności wątków chcących zająć monitor
wątek nie może zakończyć działania, ponieważ nie ma dostępu do współdzielonego zasobu
współdzielony zasób może być przydzielany innym wątkom wątki o niższym priorytecie mogą być zagłodzone
Uwięzienie (ang. livelock)
Każdy wątek próbuje znaleźć jakąś drogę wyjścia. Wątki próbujące iść dalej, nie są w stanie tego zrobić przez nadmiernie długi czas lub też nigdy
Wątek odpowiada na działanie innego wątku. Jeśli akcja drugiego wątku jest także odpowiedzią na akcje pierwszego wątku, wtedy mamy do czynienia z uwięzieniem
wątki nie są blokowane, są zbyt zajęte odpowiadaniem do siebie, aby wykonywać się dalej
przykład: dwie osoby próbują przejść wąskim korytarzem i każda z nich próbuje przepuścić drugą przesuwając się w prawo lub lewo
Uzupełnienia do przetwarzania
współbieżnego 90 / 102
Thread-safe
program jestthread-safe (wątkowo bezpieczny), jeśli wykonuje się prawidłowo podczas dostępu wielowątkowego, to znaczy nie występują problemy związane z wielowątkowością takie jak data race,
zakleszczenie, zagłodzenie, livelock, itp.
Klasyczne problemy synchronizacji
Klasyczne problemy synchronizacji 92 / 102
Problem producenta i konsumenta
W problemie występują dwa rodzaje procesów: producenti
konsument, którzy dzielą wspólny zasób (zasób związany) - bufor dla produkowanych (konsumowanych) jednostek.
Zadaniem producenta jest wytworzenie produktu i umieszczenie go w buforze
Zadaniem konsumenta jest pobranie produktu z bufora
Zadanie polega na tym, aby producent nie dodawał nowych jednostek gdy bufor jest pełny, a konsument nie pobierał gdy bufor jest pusty producenci produkują wiele jednostek, konsumenci pobierają wiele jednostek
może być wiele producentów i konsumentów
producent musi sygnalizować po zapisaniu do kolejki, że kolejka nie jest pusta
Problem czytelników i pisarzy
w problemie występują dwa rodzaje procesów: czytelnicy– wszystkie procesy niedokonujące zmian w zasobie, pisarze– pozostałe procesy jednoczesny dostęp do zasobu może uzyskać dowolna liczba
czytelników, pisarz może otrzymać tylko dostęp wyłączny, czyli w tym czasie nie jest możliwe otrzymanie dostępu przez innych pisarzy ani czytelników
problemem jest przepustowość, gdy chcemy maksymalizować przepustowość to może dojść do zagłodzenia pisarzy, umożliwienie aktualizacji może zmniejszać przepustowość
jeśli zastosujemy strategię oczekiwania przez pisarzy dopóki nie będzie czytelnika, dojdzie do zagłodzenia, jeżeli pisarze będą działali zbyt często, spadnie przepustowość
należy znaleźć równowagę
Klasyczne problemy synchronizacji 94 / 102
Problem ucztujących filozofów
Rysunek 3:Źródło: wikipedia
Problem ucztujących filozofów
Pięciu filozofów siedzi przy stole i każdy wykonuje jedną z dwóch czynności – albo jegdy jest głodny, alborozmyśla. Każda osoba ma przy sobie widelec po lewej i po prawej stronie. Zakłada się, że zjedzenie potrawy wymaga dwóch widelców. Zakładamy
nieograniczoną ilość jedzenia. Zadanie polega na zaprojektowaniu dyscypliny jedzenia tak aby żaden z nich nie głodował (tzn. każdy z filozofów będzie na przemian jadł i rozmyślał).
filozofowie to wątki, widelce to zasoby, procesy rywalizują o zasoby może dojść dozakleszczenia, kiedy każdy z filozofów zabierze lewy widelec, i będzie czekał na prawy, uwięzienia i spadku przepustowości rozwiązanie, filozof próbuje wziąść dwa widelce, jeśli jeden z nich jest zajęty, to czeka 10 minut na drugi widelec, następnie odkłada on drugi widelec i czeka 10 minut, eliminuje to zakleszczenie, ale może pojawić się uwięzienie, kiedy wszyscy w tym samym czasie wezmą lewy widelec, po 10 minutach odłożą, po kolejnych 10 minutach znowu wezmą lewe widelce
Klasyczne problemy synchronizacji 96 / 102
Pytania
Czy można wywołując metodę Lock#unlock() zwolnić blokadę założoną przez inny wątek?
Pytania
Zazwyczaj nie. Dla implementacji ReentrantLock jest rzucany wyjątek IllegalMonitorStateException, jeśli aktualny wątek nie posiada
blokady.
Klasyczne problemy synchronizacji 98 / 102
Pytania
Jaka jest różnica między ConcurrentHashMap i SynchronizedHashMap?
Pytania
Ta druga struktura używa blokady na poziomie obiektu, ta pierwsza na niższym poziomie. Ta pierwsza umożliwia modyfikację struktury listy w trakcie odczytu elementów przez inny wątek.
Klasyczne problemy synchronizacji 100 / 102
Pytania
Czy flaga używana przy wait/notify musi być atomowa?
Pytania
Nie ma potrzeby, ponieważ zarówno przy wait jak i notify operacje na fladze są wewnątrz bloku synchronizowanego. Poza tym w Javie typ prosty boolean jest i tak już atomowy.
Klasyczne problemy synchronizacji 102 / 102