(2);
new
Thread() {
ПОТОКИ ВЫПОЛНЕНИЯ
311
public
void run() {
for
(
int i = 1; i < 4; i++) {
try
{
queue.put("
Java"
+ i); // добавление 3-х
System.out.println("Element " + i + " added");
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
new
Thread() {
public
void run() {
try
{
Thread.
sleep(1_000);
// извлечение одного
System.out.println("Element " + queue.take() + " took");
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
}
В результате будет выведено:
Element 1 added
Element 2 added
Element Java1 took
Element 3 added
Семафоры
Семафор позволяет управлять доступом к ресурсам или просто работой по-
токов на основе запрещений-разрешений. Семафор всегда устанавливается
на предельное положительное число потоков, одновременное функционирова-
ние которых может быть разрешено. При превышении предельного числа все
желающие работать потоки будут приостановлены до освобождения семафора
одним из работающих по его разрешению потоков. Уменьшение счетчика досту-
па производится методами
void acquire() и его оболочки
boolean tryAcquire().
Оба метода занимают семафор, если он свободен. Если же семафор занят,
то метод
tryAcquire() возвращает ложь и пропускает поток дальше, что позво-
ляет при необходимости отказаться от дальнейшей работы потоку, который
не смог получить семафор. Метод
acquire() при невозможности захвата сема-
фора остановит поток до тех пор, пока хотя бы другой поток не освободит се-
мафор. Метод
boolean tryAcquire(
long timeout,
TimeUnit unit) возвращает
ИСПОЛЬЗОВАНИЕ КЛАССОВ И БИБЛИОТЕК
312
ложь, если время ожидания превышено, т. е. за указанное время поток не полу-
чил от семафора разрешение работать и пропускает поток дальше. Метод
release() освобождает семафор и увеличивает счетчик на единицу. Простое
надежное стандартное взаимодействие методов
acquire() и
release() демон-
стрирует следующий фрагмент.
/* # 17 # базовое решение при использовании семафора */
public void
run() {
try
{
semaphore.acquire();
// код использования защищаемого ресурса
}
catch
(InterruptedException e) {
}
finally
{
semaphore.release();
// освобождение семафора
}
}
Методы
acquire() и
release() в общем случае могут и не вызываться в одном
методе кода. Тогда за корректное и своевременное возвращение семафора бу-
дет ответственен разработчик. Метод
acquire() не пропустит поток до тех пор,
пока счетчик семафора имеет значение ноль.
Методы
acquire(), tryAquire() и
release() имеют перегруженную версию с пара-
метром типа
int. В такой метод можно передать число, на которое изменится значе-
ние счетчика семафора при успешном выполнении метода, в отличие от методов без
параметров, которые всегда изменяют значение счетчика только на единицу.
Для демонстрации работы семафора предлагается задача о пуле ресурсов
с ограниченным числом, в данном случае аудиоканалов, и значительно бо́льшим
числом клиентов, желающих воспользоваться одним из каналов. Каждый кли-
ент получает доступ к каналу, причем пользоваться можно только одним кана-
лом. Если все каналы заняты, то клиент ждет в течение заданного интервала
времени. Если лимит ожидания превышен, генерируется исключение и клиент
уходит, так и не воспользовавшись услугами пула.
Класс
ChannelPool объявляет семафор и очередь из каналов. В методе
getResource() производится запрос к семафору, и в случае успешного его про-
хождения метод извлекает из очереди канал и выдает его в качестве возвраща-
емого значения метода. Метод
returnResource() добавляет экземпляр-канал
к очереди на выдачу и освобождает семафор.
Реализация принципов пула предоставляет возможность повторного ис-
пользования объектов в ситуациях, когда создание нового объекта — дорого-
стоящая процедура с точки зрения задействованных для этого ресурсов вирту-
альной машины. Поэтому при возможности следует объект после использования
не уничтожать, а возвратить его в так называемый «пул объектов» для повтор-
ного использования. Данная стратегия широко используется при организации
пула соединений с базой данных. Реализаций организации пулов существует
ПОТОКИ ВЫПОЛНЕНИЯ
313
достаточно много с различающимися способами извлечения и возврата объектов,
а также способа контроля за объектами и за заполняемостью пула. Поэтому вы-
брать какое-либо решение как абсолютно лучшее для всех случаев невозможно.
// # 18 # пул ресурсов # ChannelPool.java
package
by.bsu.resource.pool;
import
java.util.Queue;
import
java.util.concurrent.Semaphore;
import
java.util.concurrent.TimeUnit;
import
java.util.LinkedList;
import
by.bsu.resource.exception.ResourсeException;
public
class ChannelPool
{
private
final static int POOL_SIZE = 5; // размер пула
private
final Semaphore semaphore = new Semaphore(POOL_SIZE, true);
private
final Queue resources = new LinkedList();
public
ChannelPool(Queue source) {
resources.addAll(source);
}
public
T getResource(long maxWaitMillis) throws ResourсeException {
try
{
if
(semaphore.tryAcquire(maxWaitMillis, TimeUnit.MILLISECONDS)) {
T
res
=
resources.poll();
return
res;
}
}
catch
(InterruptedException e) {
throw
new ResourсeException(e);
}
throw
new ResourсeException(":превышено время ожидания");
}
public
void returnResource(T res) {
resources.add(res); // возвращение экземпляра в пул
semaphore.release();
}
}
Класс AudioChannel предлагает простейшее описание канала и его исполь-
зования.
// # 19 # канал — ресурс: обычный класс с некоторой информацией # AudioChannel.java
package
by.bsu.resource.pool;
public
class AudioChannel {
private
int сhannellId;
public
AudioChannel(int id) {
super
();
this
.сhannellId = id;
}
public
int getСhannellId() {
return
сhannellId;
ИСПОЛЬЗОВАНИЕ КЛАССОВ И БИБЛИОТЕК
314
}
public
void setСhannellId(
int id) {
this.сhannellId = id;
}
public
void using() {
try {
// использование канала
Thread.
sleep(
new java.util.Random().nextInt(500));
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
Класс
ResourceException желателен в такого рода задачах, чтобы точно опи-
сать возникающую проблему при работе ресурса, используемого конкурирую-
щими потоками.
// # 20 # исключение, информирующее о сбое в поставке ресурса # ResourceException.java
package
by..resource.exception;
public
class bsu ResourсeException
extends Exception {
public
ResourсeException() {
super();
}
public
ResourсeException(String message, Throwable cause) {
super(message, cause);
}
public
ResourсeException(String message) {
super(message);
}
public
ResourсeException(Throwable cause) {
super(cause);
}
}
Класс
Client представляет поток, запрашивающий ресурс из пула, использую-
щий его некоторое время и возвращающий его обратно в пул.
// # 21 # поток, работающий с ресурсом # Client.java
package
by.bsu.resource.pool;
import
by.bsu.resource.exception.ResourсeException;
public
class Client
extends Thread {
private
boolean reading =
false;
private
ChannelPool
pool;
public
Client (ChannelPool pool) {
this.pool = pool;
}
public
void run() {
ПОТОКИ ВЫПОЛНЕНИЯ
315
AudioChannel channel =
null;
try {
channel
=
pool.getResource(500);
// изменить на 100
reading
=
true
;
System.
out.println("Channel Client #" +
this.getId()
+ " took channel #" + channel.getСhannellId());
channel.using();
}
catch
(ResourсeException e) {
System.
out.println("Client #" +
this.getId() + " lost ->"
+
e.getMessage());
}
finally
{
if (channel !=
null) {
reading
=
false
;
System.
out.println("Channel Client #" +
this.getId() + " : "
+ channel.getСhannellId() + " channel released");
pool.returnResource(channel);
}
}
}
public
boolean isReading() {
return reading;
}
}
Класс
Runner демонстрирует работу пула ресурсов аудиоканалов. При за-
полнении очереди каналов в данном решении необходимо следить, чтобы число
каналов, передаваемых списком в конструктор класса
ChannelPool, совпадало
со значением константы
POOL_SIZE этого же класса. Константа используется
для инициализации семафора и при большем или меньшем размерах передавае-
мого списка возникают коллизии, которые, кстати, есть смысл спровоцировать
и разобраться в причинах и следствиях.
// # 22 # запуск и использование пула # Runner.java
package
by.bsu.resource.main;
import
java.util.LinkedList;
import
by.bsu.resource.pool.AudioChannel;
import
by.bsu.resource.pool.ChannelPool;
import
by.bsu.resource.pool.Client;
public
class Runner {
public
static void main(String[] args) {
LinkedList
list = new LinkedList() {
{
this.add(new AudioChannel(771));
this.add(new AudioChannel(883));
this.add(new AudioChannel(550));
this.add(new AudioChannel(337));
this.add(new AudioChannel(442));
}
ИСПОЛЬЗОВАНИЕ КЛАССОВ И БИБЛИОТЕК
316
};
ChannelPool
pool = new ChannelPool<>(list);
for (int i = 0; i < 20; i++) {
new Client(pool).start();
}
}
}
Результатом может быть вывод:
Channel Client #8 took channel #771
Channel Client #10 took channel #550
Channel Client #12 took channel #337
Channel Client #14 took channel #442
Channel Client #9 took channel #883
Channel Client #9 : 883 channel released
Channel Client #16 took channel #883
Channel Client #12 : 337 channel released
Channel Client #18 took channel #337
Channel Client #10 : 550 channel released
Channel Client #11 took channel #550
Channel Client #11 : 550 channel released
Channel Client #13 took channel #550
Channel Client #18 : 337 channel released
Channel Client #15 took channel #337
Channel Client #14 : 442 channel released
Channel Client #17 took channel #442
Channel Client #8 : 771 channel released
Channel Client #20 took channel #771
Client #19 lost ->:превышено время ожидания
Client #26 lost ->:превышено время ожидания
Client #24 lost ->:превышено время ожидания
Client #22 lost ->:превышено время ожидания
Client #23 lost ->:превышено время ожидания
Client #25 lost ->:превышено время ожидания
Client #27 lost ->:превышено время ожидания
Client #21 lost ->:превышено время ожидания
Channel Client #16 : 883 channel released
Channel Client #13 : 550 channel released
Channel Client #17 : 442 channel released
Channel Client #15 : 337 channel released
Channel Client #20 : 771 channel released
ПОТОКИ ВЫПОЛНЕНИЯ
317
Барьеры
Многие задачи могут быть разделены на подзадачи и выполняться параллель-
но. По достижении некоторой данной точки всеми параллельными потоками
подводится итог и определяется общий результат. Если стоит задача задержать
заданное число потоков до достижения ими определенной точки синхрониза-
ции, то используются классы-барьеры. После того, как все потоки достигли этой
самой точки, они будут разблокированы и могут продолжать выполнение. Класс
CyclicBarrier определяет минимальное число потоков, которое может быть
остановлено барьером. Кроме этого барьер сам может быть проинициализиро-
ван потоком, который будет запускаться при снятии барьера. Методы
int await()
и
int await(long timeout, TimeUnit unit) останавливают поток, использующий
барьер до тех пор, пока число потоков достигнет заданного числа в классе-барьере.
Метод
await() возвращает порядковый номер достижения потоком барьерной точки.
Метод
boolean isBroken() проверяет состояние барьера. Метод
reset() сбрасывает со-
стояние барьера к моменту инициализации. Метод
int getNumberWaiting() позволя-
ет определить число ожидаемых барьером потоков до его снятия. Экземпляр
CyclicBarrier можно использовать повторно.
Процесс проведения аукциона подразумевает корректное использование клас-
са
CyclicBarrier. Класс
Auction определяет список конкурирующих предложений
от клиентов и размер барьера. Чтобы приложение работало корректно, необходи-
мо, чтобы размер списка совпадал со значением константы
BIDS_NUMBER.
Барьер инициализируется потоком определения победителя торгов, который
запустится после того, как все предложения будут объявлены. Если потоков
будет запущено больше чем размер барьера, то «лишние» предложения могут
быть не учтены при вычислении победителя, если же потоков будет меньше,
то приложение окажется в состоянии deadlock. Для предотвращения подобных
ситуаций
следует использовать метод await() с параметрами.
// # 23 # определение барьера и действия по его окончании # Auction.java
package
by.bsu.auction;
import
java.util.ArrayList;
import
java.util.Collections;
import
java.util.Comparator;
import
java.util.concurrent.CyclicBarrier;
public
class Auction {
private ArrayList
bids;
private CyclicBarrier barrier;
public final int BIDS_NUMBER = 5;
public Auction() {
this.bids = new ArrayList();
this.barrier = new CyclicBarrier(this.BIDS_NUMBER, new Runnable() {
public void run() {
Bid winner = Auction.this.defineWinner();
ИСПОЛЬЗОВАНИЕ КЛАССОВ И БИБЛИОТЕК
318
System.
out.println("Bid #" + winner.getBidId() + ", price:" + winner.getPrice() + " win!");
}
});
}
public CyclicBarrier getBarrier() {
return barrier;
}
public boolean add(Bid e) {
return bids.add(e);
}
public Bid defineWinner() {
return
Collections.
max(bids,
new Comparator
() {
@Override
public int compare(Bid ob1, Bid ob2) {
return ob1.getPrice() - ob2.getPrice();
}
});
}
}
Класс Bid определяет предложение клиента на аукционе и запрашивает барьер, по-
сле которого клиент либо заплатит за лот, либо будет продолжать работать дальше.
// # 24 # поток, использующий барьер # Bid.java
package
by.bsu.auction;
import
java.util.Random;
import
java.util.concurrent.BrokenBarrierException;
import
java.util.concurrent.CyclicBarrier;
public
class Bid extends Thread {
private
Integer bidId;
private
int price;
private
CyclicBarrier barrier;
public
Bid(int id, int price, CyclicBarrier barrier) {
this.bidId = id;
this.price = price;
this.barrier = barrier;
}
public
Integer getBidId() {
return bidId;
}
public
int getPrice() {
return price;
}
@Override
public
void run() {
try {
System.out.println("Client " + this.bidId + " specifies a price.");
Thread.sleep(new Random().nextInt(3000)); // время на раздумье
// определение уровня повышения цены
ПОТОКИ ВЫПОЛНЕНИЯ
319
int
delta =
new Random().nextInt(50);
price += delta;
System.
out.println("Bid " +
this.bidId + " : " + price);
this.barrier.await(); // остановка у барьера
System.
out.println("Continue to work...");
// проверить кто выиграл
// и оплатить в случае победы ставки
}
catch (BrokenBarrierException e) {
e.printStackTrace();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
// # 25 # инициализация аукциона и его запуск # AuctionRunner.java
package
by.bsu.auction;
import
java.util.Random;
public
class AuctionRunner {
public
static void main(String[ ] args) {
Auction auction =
new Auction();
int startPrice =
new Random().nextInt(100);
for (
int i = 0; i < auction.BIDS_NUMBER; i++) {
Bid
thread
=
new
Bid(i, startPrice, auction.getBarrier());
auction.add(thread);
thread.start();
}
}
}
Результаты работы аукциона:
Client 0 specifies a price.
Client 2 specifies a price.
Client 1 specifies a price.
Client 3 specifies a price.
Client 4 specifies a price.
Bid 4 : 87
Bid 0 : 81
Bid 1 : 93
Bid 2 : 81
Bid 3 : 96
Bid #3, price:96 win!
Continue to work...
Continue to work...
Continue to work...
Continue to work...
Continue to work...
ИСПОЛЬЗОВАНИЕ КЛАССОВ И БИБЛИОТЕК
320
«Щеколда»
Еще один вид барьера представляет класс
CountDownLatch. Экземпляр класса
инициализируется начальным значением числа ожидающих снятия «щеколды»
потоков. В отличие от
CyclicBarrier, метод
await() просто останавливает поток без
всяких изменений значения счетчика. Значение счетчика снижается вызовом мето-
да
countDown(), т. е. «щеколда» сдвигается на единицу. Когда счетчик обнулится,
барьеры, поставленные методом
await(), снимаются для всех ожидающих разре-
шения потоков. Крайне желательно, чтобы метод
await() был вызван раньше, чем
метод
countDown(). Последнему безразлично, вызывался метод
await() или нет,
счетчик все равно будет уменьшен на единицу. Если счетчик равен нулю, то «лиш-
ние»
вызовы метода countDown() будут проигнорированы.
Демонстрацией возможностей класса
CountDownLatch может служить за-
дача выполнения студентами набора заданий (тестов).
Студенту предлагается
для выполнения набор заданий. Он выполняет их и переходит в режим ожида-
ния оценок по всем заданиям, чтобы вычислить среднее значение оценки.
Преподаватель (
Tutor) проверяет задание и после каждого проверенного зада-
ния сдвигает «щеколду» на единицу. Когда все задания студента проверены,
счетчик становится равным нулю и барьер снимается, производятся необходи-
мые вычисления в классе
Student.
// # 26 # поток-студент, выполняющий задания и ожидающий их проверки # Student.java
Достарыңызбен бөлісу: