public
AtomicLong getIndex() {
return index;
}
@Override
public
void run() {
Random random = new Random();
try {
while (true) {
index.addAndGet(random.nextInt(10));
Thread.sleep(random.nextInt(500));
index.addAndGet(-1
*
random.nextInt(10));
Thread.sleep(random.nextInt(500));
}
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
Имеется класс Broker, запрашивающий значение поля index с некоторым
интервалом в миллисекундах.
/* # 10 # получатель значения атомарного поля # Broker.java */
package
by.bsu.market;
import
java.util.Random;
public
class Broker extends Thread {
private
Market market;
private
static final int PAUSE = 500; // in millis
public
Broker(Market market) {
this.market = market;
}
@Override
public
void run() {
try {
while (true) {
System.out.println("Current index: " + market.getIndex());
Thread.sleep(PAUSE);
}
ПОТОКИ ВЫПОЛНЕНИЯ
301
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
Количество экземпляров класса Broker может быть любым, и они постоян-
но с заданным интервалом запрашивают текущее значение index.
/* # 11 # запуск потоков изменения атомарного поля и его отслеживания несколькими
потоками # AtomicDemo.java */
package
by.bsu.market;
import
java.util.concurrent.atomic.AtomicLong;
public
class AtomicDemo {
private
static final int NUMBER_BROKERS = 30;
public
static void main(String[ ] args) {
Market market = new Market(new AtomicLong(100));
market.start();
for
(int i = 0; i < NUMBER_BROKERS; i++) {
new
Broker(market).start();
}
}
}
Атомарность поля обеспечивает получение экземплярами класса Broker
идентичных текущих значений поля index.
Методы synchronized
Нередко возникает ситуация, когда несколько потоков имеют доступ к неко-
торому объекту, проще говоря, пытаются использовать общий ресурс и начина-
ют мешать друг другу. Более того, они могут повредить этот общий ресурс.
Например, когда два потока записывают информацию в файл/объект/поток.
Для контролирования процесса записи может использоваться разделение ре-
сурса с применением ключевого слова synchronized.
В качестве примера будет рассмотрен процесс записи информации в файл
двумя конкурирующими потоками. В методе main() классa SynchroRun созда-
ются два потока. В этом же методе создается экземпляр класса Resource, со-
держащий поле типа FileWriter, связанное с файлом на диске. Экземпляр
Resource передается в качестве параметра обоим потокам. Первый поток запи-
сывает строку методом writing() в экземпляр класса Resource. Второй поток
также пытается сделать запись строки в тот же самый объект Resource. Во из-
бежание одновременной записи такие методы объявляются как synchronized.
Синхронизированный метод изолирует объект, после чего он становится недо-
ступным для других потоков. Изоляция снимается, когда поток полностью
ИСПОЛЬЗОВАНИЕ КЛАССОВ И БИБЛИОТЕК
302
выполнит соответствующий метод. Другой способ снятия изоляции — вызов
метода wait() из изолированного метода — будет рассмотрен позже.
В примере продемонстрирован вариант синхронизации файла для защиты
от одновременной записи информации в файл двумя различными потоками.
/* # 12 # синхронизация записи информации в файл # SyncThread.java # Resource.java
# SynchroRun.java */
package
by.bsu.synch;
import
java.io.*;
public class
Resource {
private
FileWriter fileWriter;
public
Resource (String file) throws IOException {
// проверка наличия файла
fileWriter
=
new
FileWriter(file, true);
}
public
synchronized void writing(String str, int i) {
try {
fileWriter.append(str + i);
System.out.print(str + i);
Thread.sleep((long)(Math.random() * 50));
fileWriter.append("->" + i + " ");
System.out.print("->" + i + " ");
}
catch
(IOException e) {
System.err.print("ошибка файла: " + e);
}
catch
(InterruptedException e) {
System.err.print("ошибка потока: " + e);
}
}
public
void close() {
try
{
fileWriter.close();
}
catch
(IOException e) {
System.err.print("ошибка закрытия файла: " + e);
}
}
}
package
by.bsu.synch;
public class
SyncThread extends Thread {
private
Resource rs;
public
SyncThread(String name, Resource rs) {
super(name);
this.rs = rs;
}
public
void run() {
for (int i = 0; i < 5; i++) {
rs.writing(getName(), i); // место срабатывания синхронизации
}
}
}
ПОТОКИ ВЫПОЛНЕНИЯ
303
package
by.bsu.synch;
import
java.io.IOException;
public
class SynchroRun {
public
static void main(String[ ] args) {
Resource s = null;
try
{
s
=
new
Resource ("data\\result.txt");
SyncThread
t1
=
new
SyncThread("First", s);
SyncThread
t2
=
new
SyncThread("Second", s);
t1.start();
t2.start();
t1.join();
t2.join();
}
catch
(IOException e) {
System.err.print("ошибка файла: " + e);
}
catch
(InterruptedException e) {
System.err.print("ошибка потока: " + e);
} finally {
s.close();
}
}
}
В результате в файл будет, например, выведено:
First0->0 Second0->0 First1->1 Second1->1 First2->2 Second2->2 First3->3
Second3->3 First4->4 Second4->4
Код построен таким образом, что при отключении синхронизации метода
writing() в случае его вызова одним потоком другой поток может вклиниться
и произвести запись своей информации, несмотря на то, что метод не завершил
запись, инициированную первым потоком.
Вывод в этом случае может быть, например, следующим:
First0Second0->0 Second1->0 First1->1 First2->1 Second2->2 First3->3 First4->2
Second3->3 Second4->4 ->4
Инструкция synchronized
Синхронизировать объект можно не только при помощи методов с соответ-
ствующим модификатором, но и при помощи синхронизированного блока кода.
В этом случае происходит блокировка объекта, указанного в инструкции
synchronized, и он становится недоступным для других синхронизированных
методов и блоков. Такая синхронизация позволяет сузить область синхрониза-
ции, т. е. вывести за пределы синхронизации код, в ней не нуждающийся.
Обычные методы на синхронизацию внимания не обращают, поэтому ответст-
венность за грамотную блокировку объектов ложится на программиста.
ИСПОЛЬЗОВАНИЕ КЛАССОВ И БИБЛИОТЕК
304
/* # 13 # блокировка объекта потоком # TwoThread.java */
package
by.bsu.instruction;
public
class TwoThread {
public static
int counter = 0;
public
static void main(String args[ ]) {
final
StringBuilder s = new StringBuilder();
new
Thread() {
public
void run() {
synchronized
(s) {
do
{
s.append("A");
System.out.println(s);
try
{
Thread.sleep(100);
}
catch
(InterruptedException e) {
System.err.print(e);
}
} while (TwoThread.counter++ < 2);
}
// конец synchronized
}
}.start();
new
Thread() {
public
void run() {
synchronized
(s) {
while
(TwoThread.counter++ < 6) {
s.append("B");
System.out.println(s);
}
}
// конец synchronized
}.start();
}
}
В результате компиляции и запуска, скорее всего (например, второй поток
может заблокировать объект первым), будет выведено:
A
AA
AAA
AAAB
AAABB
AAABBB
Один из потоков блокирует объект, и до тех пор, пока он не закончит выпол-
нение блока синхронизации, в котором производится изменение значения объ-
екта, ни один другой поток не может вызвать синхронизированный блок для
этого объекта.
}
ПОТОКИ ВЫПОЛНЕНИЯ
305
Если в коде убрать синхронизацию объекта s, то вывод будет другим, так
как другой поток сможет получить доступ к объекту и изменить его раньше,
чем первый закончит выполнение цикла.
Данный пример можно немного изменить для демонстрации «потокобез-
опасности» класса StringBuffer при вызове метода append() на синхронизиро-
ванном экземпляре.
/* # 14 # потокобезопасность класса StringBuffer # BufferThread.java */
package
by.bsu.synchro;
public
class BufferThread {
static
int counter = 0;
static
StringBuffer s = new StringBuffer(); // заменить на StringBuilder
public
static void main(String args[ ]) throws InterruptedException {
new
Thread() {
public void
run() {
synchronized
(s) {
while
(BufferThread.counter++ < 3) {
s.append("A");
System.out.print("> " + counter + " ");
System.out.println(s);
Thread.sleep(500);
}
}
// конец synchronized-блока
}
}.start();
Thread.sleep(100);
while
(BufferThread.counter++ < 6) {
System.out.print("< " + counter + " ");
// в этом месте поток main будет ждать освобождения блокировки объекта s
s.append("B");
System.out.println(s);
}
}
}
Вызов метода на синхронизированном другим потоком объекте класса
StringBuffer приведет к остановке текущего потока до тех пор, пока объект
не будет разблокирован. То есть выведено будет следующее:
> 1 A
< 2 > 3 AA
AAB
< 5 AABB
< 6 AABBB
Если заменить
StringBuffer на StringBuilder, то остановки потока на забло-
кированном объекте не произойдет и вывод будет таким:
ИСПОЛЬЗОВАНИЕ КЛАССОВ И БИБЛИОТЕК
306
> 1 A
< 2 AB
< 3 ABB
< 4 ABBB
< 5 ABBBB
< 6 ABBBBB
Монитор
Контроль за доступом к объекту-ресурсу обеспечивает понятие монитора.
Монитор экземпляра может иметь только одного владельца. При попытке кон-
курирующего доступа к объекту, чей монитор имеет владельца, желающий за-
блокировать объект-ресурс поток должен подождать освобождения монитора
этого объекта и только после этого завладеть им и начать использование объек-
та-ресурса. Каждый экземпляр любого класса имеет монитор. Методы wait(),
wait(long inmillis), notify(), notifyAll() корректно срабатывают только на эк-
земплярах, чей монитор уже кем-то захвачен. Статический метод захватывает
монитор экземпляра класса Class, того класса, на котором он вызван. Существует
в единственном экземпляре. Нестатический метод захватывает монитор экзем-
пляра класса, на котором он вызван.
Методы wait(), notify() и notifyAll()
Эти методы никогда не переопределяются и используются только в исход-
ном виде. Вызываются только внутри синхронизированного блока или метода
на объекте, монитор которого захвачен текущим потоком. Попытка обращения
к данным методам вне синхронизации или на несинхронизированном объекте
(со свободным монитором) приводит к генерации исключительной ситуации
IllegalMonitorStateException. В примере #15 рассмотрено взаимодействие ме-
тодов wait() и notify() при освобождении и возврате блокировки в synchronized
блоке. Эти методы используются для управления потоками в ситуации, когда
необходимо задать определенную последовательность действий без повторного
запуска потоков.
Метод wait(), вызванный внутри синхронизированного блока или метода,
останавливает выполнение текущего потока и освобождает от блокировки за-
хваченный объект. Возвратить блокировку объекта потоку можно вызовом ме-
тода notify() для одного потока или notifyAll() для всех потоков. Если ожидаю-
щих потоков несколько, то после вызова метода notify() невозможно определить,
какой поток из ожидающих потоков заблокирует объект. Вызов может быть
осуществлен только из другого потока, заблокировавшего в свою очередь
тот же самый объект.
ПОТОКИ ВЫПОЛНЕНИЯ
307
Проиллюстрировать работу указанных методов можно с помощью примера,
когда инициализация полей и манипуляция их значениями производится в раз-
личных потоках.
/* # 15 # взаимодействие wait() и notify() # Payment.java # PaymentRunner.java */
package
by.bsu.synchro;
import
java.util.Scanner;
public
class Payment {
private
int amount;
private
boolean close;
public
int getAmount() {
return
amount;
}
public
boolean isClose() {
return
close;
}
public
synchronized void doPayment() {
try
{
System.out.println("Start payment:");
while
(amount <= 0) {
this
.wait(); // остановка потока и освобождение блокировки
// после возврата блокировки выполнение будет продолжено
}
// код выполнения платежа
close
=
true
;
}
catch
(InterruptedException e) {
e.printStackTrace();
}
System.out.println("Payment is closed : " + close);
}
public
void initAmount() {
Scanner scan = new Scanner(System.in);
amount = scan.nextInt();
}
}
package
by.bsu.synchro;
public
class PaymentRunner {
public
static void main(String[] args) throws InterruptedException {
final
Payment payment = new Payment();
new
Thread() {
public
void run() {
payment.doPayment();
// вызов synchronized метода
}
}.start();
Thread.sleep(200);
synchronized
(payment) { // 1-ый блок
System.out.println("Init amount:");
payment.initAmount();
ИСПОЛЬЗОВАНИЕ КЛАССОВ И БИБЛИОТЕК
308
payment.notify(); // уведомление о возврате блокировки
}
synchronized (payment) { // 2-ой блок
payment.wait(1_000);
System.out.println("ok");
}
}
}
В результате компиляции и запуска при вводе корректного значения для
инициализации поля amount будет запущен процесс проведения платежа.
Задержки потоков методом sleep() используются для точной демонстрации
последовательности действий, выполняемых потоками. Если же в коде приложе-
ния убрать все блоки синхронизации, а также вызовы методов wait() и notify(),
то результатом вычислений, скорее всего, будет ноль, так как вычисление будет
произведено до инициализации полей объекта.
Новые способы управления потоками
Java
всегда предлагала широкие возможности для многопоточного програм-
мирования: потоки — это основа языка. Однако очень часто использование
этих возможностей вызывало трудности: создать и отладить для корректного
функционирования многопоточную программу достаточно сложно.
В версии Java SE 5 языка добавлен пакет java.util.concurrent, возможно-
сти классов которого обеспечивают более высокую производительность, мас-
штабируемость, построение потокобезопасных блоков concurrent классов.
Кроме этого усовершенствован вызов утилит синхронизации, добавлены клас-
сы семафоров и блокировок.
Возможности синхронизации существовали и ранее. Практически это озна-
чало, что синхронизированные экземпляры блокировались, хотя необходимо
это было далеко не всегда. Например, поток, изменяющий одну часть объекта
Hashtable, блокировал работу других потоков, которые хотели бы прочесть
(даже не изменить) совсем другую часть этого объекта. Поэтому введение до-
полнительных возможностей, связанных с синхронизаций потоков и блоки-
ровкой ресурсов, довольно логично.
Ограниченно потокобезопасные (thread safe) коллекции и вспомогательные
классы управления потоками сосредоточены в пакете java.util.concurrent.
Среди них можно отметить:
• параллельные аналоги существующих синхронизированных классов-кол-
лекций ConcurrentHashMap, ConcurrentLinkedQueue — эффективные
аналоги Hashtable и LinkedList;
• классы CopyOnWriteArrayList и CopyOnWriteArraySet, копирующие свое
содержимое при попытке его изменения, причем ранее полученный итератор
будет корректно продолжать работать с исходным набором данных;
ПОТОКИ ВЫПОЛНЕНИЯ
309
• блокирующие очереди BlockingQueue и BlockingDeque, гарантирующие
остановку потока, запрашивающего элемент из пустой очереди до появле-
ния в ней элемента, доступного для извлечения, а также блокирующего по-
ток, пытающийся вставить элемент в заполненную очередь, до тех пор, пока
в очереди не освободится позиция;
• механизм управления заданиями, основанный на возможностях класса
Executor, включающий организацию запуска пула потоков и службы их
планирования;
• классы-барьеры синхронизации, такие как CountDownLatch (заставляет
потоки ожидать завершения заданного числа операций, по окончании чего
все ожидающие потоки освобождаются), Semaphore (предлагает потоку
ожидать завершения действий в других потоках), CyclicBarrier (предлагает
нескольким потокам ожидать момента, когда они все достигнут какой-либо
точки, после чего барьер снимается), Phaser (барьер, контракт которого яв-
ляется расширением возможностей CyclicBarrier, а также частично согла-
совывается с возможностями CountDownLatch);
• класс Exchanger позволяет потокам обмениваться объектами.
В дополнение к перечисленному выше в пакете java.util.concurrent.locks
содержатся дополнительные реализации моделей синхронизации потоков,
а именно:
• интерфейс Lock, поддерживающий ограниченные ожидания снятия блоки-
ровки, прерываемые попытки блокировки, очереди блокировки и установку
ожидания снятия нескольких блокиро вок посредством интерфейса
Condition;
• класс семафор ReentrantLock, добавляющий ранее не существующую
функциональность по отказу от попытки блокировки объекта с возможно-
стью многократного повторения запроса на блокировку и отказа от нее;
• класс ReentrantReadWriteLock позволяет изменять объект только одному
потоку, а читать в это время — нескольким.
Перечисление TimeUnit
Представляет различные единицы измерения времени. В TimeUnit реализо-
ван ряд методов по преобразованию между единицами измерения и по управ-
лению операциями ожидания в потоках в этих единицах. Используется для
информирования методов, работающих со временем, о том, как интерпретиро-
вать заданный параметр времени.
Перечисление TimeUnit может представлять время в семи размерностях-зна-
чениях: NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS,
MINUTES, HOURS, DAYS.
Кроме методов преобразования единиц времени представляют интерес ме-
тоды управления потоками:
ИСПОЛЬЗОВАНИЕ КЛАССОВ И БИБЛИОТЕК
310
void timedWait(Object obj, long timeout) — выполняет метод wait(long
time) для объекта obj класса Object, используя данные единицы измерения;
void timedJoin(Thread thread, long timeout) — выполняет метод join(long
time) на потоке thread, используя данные единицы измерения.
void sleep(long timeout) — выполняет метод sleep(long time) класса Thread,
используя данные единицы измерения.
Блокирующие очереди
Реализации интерфейсов BlockingQueue и BlockingDeque предлагают ме-
тоды по добавлению/извлечению элементов с задержками, а именно:
void put(E e) — добавляет элемент в очередь. Если очередь заполнена,
то ожидает, пока освободится место;
Достарыңызбен бөлісу: |