Transcript Week 8
Practice Session 8
•
•
•
•
Lockfree LinkedList
Blocking queues
Producers-Consumers pattern
Callables, Futures and
CompetionService
• CountDownLatch
• Thread cancellation:
•
•
•
Stop
shouldStop
Interrupt
Lockfree LinkedList - CompareAndSet
01 public class LinkedList<T> {
02
private AtomicReference<Link<T>> head = new AtomicReference(null);
03
04
public void add(T data) {
05
Link<T> localHead;
06
Link<T> newHead = new Link<>(null, data);
07
do {
08
localHead = head.get();
09
newHead.next = localHead;
10
} while (!head.compareAndSet(localHead, newHead));
11
}
12 }
Blocking Queues
• An ordinary queue with a special feature.
• The queue behaves differently in two cases:
– Empty queue case
• Thread wants to pop head element of the queue.
• Thread is blocked until the queue stops being empty
– Full queue case
• Thread wants to add an element to the queue.
• Thread is blocked until the queue stops being full
Java Blocking Queue
• API:
– Package java.util.concurrent
– Interface: BlockingQueue<E>
• “Our” Implementation:
– class MyBlockingQueue<E> implements BlockingQueue<E>
• Java’s Implementation:
– Java.util.concurrent.ArrayBlockingQueue<E>
• Functions:
– void put(E o)
• Adds the specified element to this queue, waiting if necessary for space to become
available.
– E take()
• Retrieves and removes the head of this queue, waiting if no elements are present on this
queue.
• API Website:
–
–
http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html
http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ArrayBlockingQueue.html
MyBlockingQueue
•
Private Fields:
private ArrayList<E> fList;
• The queue object.
• Not synchronized.
private final int fMax;
• Our queue’s maximum size.
• Received upon construction.
• Fixed Size.
• Constructor:
MyBlockingQueue(int max){
fList = new ArrayList();
fMax = max;
}
• Private Functions:
private synchronized int getSize(){
return fList.size();
}
My Blocking Queue – put()
put
public synchronized void
(E obj){
while(getSize()>=fMax){
try{
this.wait();
} catch (InterruptedException ignored){}
}
fList.add(obj);
// wakeup everybody. If someone is waiting in the get()
// method, it can now perform the get.
this.notifyAll();
}
MyBlockingQueue – take()
take
public synchronized E
(){
while(size()==0){
try{
this.wait();
} catch (InterruptedException ignored){}
}
E obj = fList.get(0);
fList.remove(0);
// wakeup everybody. If someone is waiting in the add()
// method, it can now perform the add.
this.notifyAll();
return obj;
}
The Producer-Consumer Problem
• A classical multi-process (thread) synchronization problem.
• Uses a bounded (fixed-size) queue.
• Two major groups:
– Producer(s):
• A Thread that generates new objects
• Adds them to shared space
– Consumer(s):
• A Thread that removes objects from shared space.
• Uses them.
• Full queue:
– A producer thread is blocked, until a free space is available.
• Empty queue:
– A consumer thread is blocked, until the queue receives new object.
Producer-Consumer Implementation
• Using ArrayBlockingQueue
• Three Classes:
– Producer implements Runnable
– Consumer implements Runnable
– Driver class (includes main function)
• Code Example:
– Producer-Consumer
• The output of the program does not necessarily reflect its flow, since printing and
accessing the queue are 2 separate operations (the operating system might decide to
run the first command, and then stops the thread and runs another thread, before this
thread performs the second command).
• Surrounding the 2 commands with a synchronized block solves this problem, but it’s not
advisable since it blocks too much.
Callable
• java.util.concurrent.Callable :
• Like Runnable, but:
– Allows Threads to return values.
– Allows Threads to throw exceptions.
– Uses generics to define object types.
– Class Header:
public class <threadName> implements Callable<returnObjectName>
– Required Functions:
• public <returnObjectType> call()
• Same purpose as Runnable’s run()
Futures
• Used in combination with the Executor.
• An object that represents the result of a Callable.
• Can retrieve the Callable’s result:
– Using get() method
– The result can be retrieved only when the computation has
completed.
– get() blocks, until value can be retrieved.
• Allows cancellation of Thread execution.
– Using cancel() method
– Cannot cancel a completed Callable.
• Interface:
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html
Code Example – The Price is Right
• A game were contestants try to guess the price of an item,
given a max and a min value
• . The contestant whose guess is closest to the actual price
wins.
makes a guess between $1 - $1000
Contestant1
makes a guess between $1 - $1000
Contestant2
Contestant3
makes a guess between $1 - $1000
makes a guess between $1 - $1000
Contestant4
makes a guess between $1 - $1000
Contestant5
ThePriceIsRight
Item: Xerox Phaser
5500DN Workgroup Laser
Printer.
Actual price: $500
ExecutorCompletionService
• Works in combination with the executor service.
• Once a callable is completed, the returned Future is added to
the ExecutorCompletionService queue.
• Easy fetching and using via .get()
Notes
• Runnable, Callable are objects, they are not threads.
• Threads receive these objects and run them.
• Threads receive a reference to these objects. Threads do not create a new
copy of the runnable or the callable objects.
• Having a stateful runnable/callable object (changes its own values) and
running more than one thread to it will cause problems if not
synchronized!
• Runnable objects cannot throw exceptions back to main. Run() method
does not throw Exception
• Callable objects can throw exceptions back to main. Call() method throws
Exception.
Thread Cancellation
• Thread t = new Thread(…);
• How?
– t.stop() method. (deprecated)
• Good? (doesn’t leave the objects in a stable state)
– Unsafe!
– Releases all object locks instantly.
• Then, how?
– By implementing a “shouldStop” method.
– “shouldStop” checks a flag in the thread.
– Case where a flag is true, thread stops working.
• Good?
– Not always!
– Thread might not take too long to stop:
• In case where the thread in sleep condition.
– Thread might not stop at all:
• In case where the thread in wait() condition. And no notify() on the horizon.
shouldStop Example
01 class Worker implements Runnable {
02
private boolean shouldStop ;
03
public Worker() {
04
shouldStop=false;
05
}
06
public synchronized void stop() {
07
shouldStop = true;
08
}
09
public synchronized boolean shouldStop() {
10
return shouldStop;
11
}
12
public void run() {
13
while (!this.shouldStop()){
14
//your code here;
15
}
16
System.out.println("stopping…");
17
}
18 }
interrupt()
• The interrupt() mechanism.
• Each thread stores an internal flag known as interrupt status.
• Methods:
– t.isInterrupted()
• Checks whether the thread is interrupted or not.
– t.interrupt():
• If t is blocked (wait(), sleep()) then
– InterruptedException is thrown.
– Forces the thread to wake up! From sleep or wait mode.
• Otherwise
– isInterrupted() will return true.
– Behaves the same as shouldStop.
• Note:
– If InterruptedException is thrown, isInterrupted() will return false unless interrupt() is called
again.
interrupt() example
01 class Worker implements Runnable {
02
03 public Worker() { }
04
public boolean simulationDone() { //in normal conditions, your program will just end once the work is done
05
return true;
06
}
07
08
public synchronized void doSomeWork() {
09
while (!simulationDone() && !Thread.currentThread().isInterrupted()) {
10
try {
11
this.wait();
12
} catch (InterruptedException e) {
13
Thread.currentThread().interrupt(); // re-raise the interrupt. This is very important!
14
break; // get out of the loop (and out of the method)
15
}
16
}
17
}
18
19
public void run() {
20
while (!Thread.currentThread().isInterrupted()){
21
doSomeWork();
22
}
23
System.out.println("stopping ;)");
24
}
25 }
interrupt() example continued
01 class Driver{
02
public static void main(String[] args) {
03
Thread t = new Thread(new Worker());
04
t.start();
05
try {
06
Thread.sleep(100);
07
} catch (InterruptedException e) {
08
e.printStackTrace();
09
}
10
t.interrupt();
11
}
12 }
java.util.concurrent.CountDownLatch
• What?
– A synchronization method.
– Allows one or more threads to wait until other threads
complete.
• How?
– A CountDownLatch object is initialized with a starting value.
– The await() method blocks until the current count reaches zero
due to invocations of the countDown() method in other
threads.
– After which all waiting threads are released and any subsequent
invocations of await return immediately.
• Properties:
– The CountDownLatch cannot be reset.
– This is a good practice for initialization/finalization purposes.
– When we need to use some waiting point only once, the latch
is best to do the job.
• Example:
– In multiplayer games you don’t want any player to
start until all players have joined.
– This procedure works only once at the start of the
game.
• API:
– http://docs.oracle.com/javase/1.5.0/docs/api/java/uti
l/concurrent/CountDownLatch.html
• Code Example: server waiting for clients to finish,
before it shuts down.
– CountDownLatch