Transcript Slide 1

Concurrent
Programming
CS 3331
Fall 2009
CS 3331
1
Outline
What’re threads?
 How to create threads?
 How to controlling threads?
 Thread safety and synchronization
 BoundedQueue example

CS 3331
2
Concurrent Programming
Shared memory vs. message-based (e.g.,
IPC)
 Built-in language features vs. librarybased

CS 3331
3
Multithreaded Programming

Thread
 A single
sequential
flow of control within a
program, i.e., a
lightweight flow that
executes concurrently
with other threads
within a single process.
 Communicates with
other threads via
shared variables and
objects.
 Managed by JVM.
CS 3331
flow of control
sequential
concurrent
4
Why?
Can take advantages of multiple CPUs.
 Allows a server to handle multiple clients
simultaneously.
 Suitable for developing reactive system.
 Makes application more responsive to
user inputs (e.g., GUI programs such as
BouncingBall applet).
 Natural to model real worlds.

CS 3331
5
But,
More difficult to program and debug (e.g.,
shared access to objects)
 Nondeterministic order of thread execution
 Safety and liveness problems

CS 3331
6
Creating Threads in Java

Two approaches
 By
extending the java.lang.Thread class
 By implementing the java.lang.Runnable
interface
CS 3331
7
Creating Threads (Cont.)

Extending the Thread class
public class MyThread extends Thread {
public void run() {
// the thread body
}
// other methods and fields.
}
flow of control
// Start a new thread by calling the start method
MyThread thread = new MyThread();
thread.start();
// …
start()
// …
run()
The method run is the concurrent unit in Java.
CS 3331
8
Creating Threads (Cont.)

Implementing the Runnable interface
public interface Runnable {
void run();
}
public class MyThread implements Runnable {
public void run() {
// the thread body
}
// other methods and fields.
}
// Start a new thread by creating an instance and calling the start method, e.g.,
new Thread(new MyThread()).start();
CS 3331
9
Example --- Simple Counter
public class Counter extends Thead {
private int count, inc, delay;
public Counter(int init, int inc, int delay) {
this.count = init; this.inc = inc; this.delay = delay;
}
public void run() {
try {
for (;;) {
System.out.print(count + “ “);
count += inc;
sleep(delay);
}
} catch (InterruptedException e) {}
}
public static void main(String[] args) {
new Counter(0, 1, 33).start();
new Counter(0, -1, 100).start();
}
}
CS 3331
10
Example (Cont.)
Output:
0 0 1 2 -1 3 4 5 -2 6 7 8 -3 9 10 -4 11 12 13 -5 14 15 16 -6 17 18 -7 19
20 21 -8 22 23 24 -9 25 26 -10 27 28 -11 29 30 31 -12 32 33 34 -13 35
36 37 -14 38 39 -15 40 41 42 -16 43 44 45 …
Q: What will happen if we change the statement “new
Counter().start()” to “new Counter().run()” in the main
method?
CS 3331
11
Exercise
Write a thread class named Talker that prints a hello message,
say “Hello, I am Tom!”, continuously and creates two instances of
Talker as separate threads.
public class Talker _______________________ {
private String name;
public Talker(String name) {
this.name = name;
}
}
CS 3331
12
Controlling Threads

Life cycle of threads

New


After creation but before start() call
Alive


After the start() call; run() is implicitly called.
Consists of two sub states



Runnable: Ready to run; maybe running or waiting for their turns
Non-runnable: Not ready to run; waiting for some events (e.g.,
sleeping, blocked, or waiting)
Dead

When run() method returns
New
Alive
start
yield
Runnable
sleep
join
Dead
end of
run
Non-runnable
CS 3331
13
Methods of Thread Class
Method
Description
start()
*sleep()
join()
Enter the Runnable state from the new state
Enter the Non-runnable state
Enter the Non-runnable state and wait for
another thread to terminate
Release CPU and remain in the Runnable
Cause InterruptedException
True if in the alive state
True if interrupted
*yield()
interrupt()
isAlive()
isInterrupted()
* static method
CS 3331
14
Example
Coordinating concurrent jobs
public class Job extends Thread {
public void run() {
// do some meaningful work, e.g., writing multithreaded applets.
}
public static void main(String[] args) {
Job[] jobs = new Job[10];
for (int i = 0; i < jobs.length; i++) {
jobs[i] = new Job();
jobs[i].start();
}
for (int i = 0; i < jobs.length; i++) {
jobs[i].join();
}
}
main
jobs[0] … jobs[9]
}
CS 3331
15
Exercise
Explain the execution flow of the following program.
public class Job extends Thread {
public void run() {
// do some meaningful work, e.g., writing multithreaded applets.
}
public static void main(String[] args) {
Job[] jobs = new Job[10];
for (int i = 0; i < jobs.length; i++) {
jobs[i] = new Job();
jobs[i].start();
jobs[i].join();
}
}
}
CS 3331
16
Thread Safety

Example --- bank accounts
public class Account {
private long balance;
public boolean withdraw(long amount) {
if (amount <= balance) {
long newBalance = balance – amount;
balance = newBalance;
return true;
} else {
return false;
}
}
// other methods and fields
}

What might happen in the presence of multiple threads?
CS 3331
17
Example (Cont.)

Possible scenario
if (amount <= balance) {
long newBalance = balance – amount;
balance = newBalance;
return true;
}
Assume that the initial balance is $1,000,000, and two withdraw
requests of $1,000,000 are made almost simultaneously.
time balance
t1
t2
t3
t4
t5
t6
t7
t8
CS 3331
1,000,000
1,000,000
1,000,000
1,000,000
0
0
0
0
withdraw 1
withdraw 2
amount<=balance
amount<=balance
newbalance=...;
newbalance=...;
balance=...;
balance=...;
return true;
return true;
18
Atomicity of Operations

The problem
 Several
calls of the withdraw method may be
interleaved interfering with each others.

Atomic operation
 An
operation that can’t be interrupted, e.g.,
reading and assignment to (word-sized) variables.
 Non-atomic operations should be explicitly
synchronized to ensure atomicity.
CS 3331
19
Making methods atomic

Synchronized methods and statements

A lock is associated with each object.
// synchronized method
synchronized void aMethod() {
// do something
}
// synchronized statement
synchronized (expr) { // expr should be of a reference type!
// do somthing
}
CS 3331
20
Example
public class Account {
private long balance;
public synchronized boolean withdraw(long amount) {
if (amount <= balance) {
long newBalance = balance – amount;
balance = newBalance;
return true;
} else {
return false;
}
}
// other methods and fields
}
CS 3331
21
Exercise
Make withdraw method synchronized by using the
synchronized statement.
public class Account {
private long balance;
public boolean withdraw(long amount) {
// WRITE YOUR CODE HERE…
if (amount <= balance) {
long newBalance = balance – amount;
balance = newBalance;
return true;
} else {
return false;
}
}
}
CS 3331
22
Synchronization

Synchronization
 Mechanism
to controls the order in which threads
execute
 Competition vs. cooperative synchronization

Mutual exclusion of threads
 Each
synchronized method or statement is
guarded by an object.
 When entering a synchronized method or
statement, the object will be locked until the
method is finished.
 When the object is locked by another thread, the
current thread must wait.
CS 3331
23
Example --- Bounded Queue
A first-in, first-out queue with a fixed capacity
public class BoundedQueue {
protected Object rep[]; // circular array
protected int front = 0; // front of the queue
protected int back = -1; // back of the queue
protected int size; // capacity of the queue
protected int count = 0; // num of objects in the queue
//@ requires size > 0;
public BoundedQueue(int size) {
this.size = size;
rep = new Object[size];
back = size – 1;
}
public boolean isEmpty() {
return (count == 0);
}
CS 3331
24
Bounded Queue (Cont.)
public boolean isFull() { return (count == size); }
public int getCount() { return count; }
public void put(/*@ non_null @*/ Object e) {
if (!isFull()) {
back = (back + 1) % size;
rep[back] = e;
count++;
}
}
public Object get() {
Object result = null;
if (!isEmpty()) {
result = rep[front];
front = (front + 1) % size;
count--;
}
return result;
}
}
CS 3331
25
Making Queue Thread-Safe
public class SyncBoundedQueue extends BoundedQueue {
public SyncBoundedQueue(int size) { super(size); }
public synchronized boolean isEmpty() { return super.isEmpty(); }
public synchronized boolean isFull() { return super.isFull(); }
public synchronized int getCount() { return super.getCount(); }
public synchronized void put(Object e) { super.put(e); }
public synchronized Object get() { return super.get(); }
}
CS 3331
26
Use of Bounded Queue
Typical use in producers and consumers
Thread
Thief
Broker
SyncBoundedQueue
CS 3331
27
Producer
public class Thief extends Thread {
private BoundedQueue queue;
private int n; // number of items to steal
public Thief(BoundedQueue queue, int n) {
this.queue = queue;
this.n = n;
}
public void run() {
for (int i = 0; i < n; i++) {
queue.put(new Integer(i));
System.out.println(“produced: “ + i);
try {
sleep((int) (Math.random() * 100));
} catch (InterruptedException e) {}
}
}
}
CS 3331
28
Consumer
public class Broker extends Thread {
private BoundedQueue queue;
private int n; // number of stolen items to sell
public Broker(BoundedQueue queue, int n) {
this.queue = queue;
this.n = n;
}
public void run() {
for (int i = 0; i < n; i++) {
Object obj = queue.get();
System.out.println(“\tconsumed: “ + obj);
try {
sleep((int) (Math.random() * 400));
} catch (InterruptedException e) {}
}
}
}
CS 3331
29
Sample Main Program
The main method of class SyncBoundedQueue
public static void main(String[] args) {
SyncBoundedQueue queue = new SyncBoundedQueue(5);
new Thief(queue, 15).start(); // produce 15 items
new Broker(queue, 10).start(); // consume 10 items
}
Output
Some of the items might be lost, as the producer produces items
faster than the consumer consumes them.
CS 3331
30
Cooperation among Threads

Guarded suspension




Before a method is executed, the guard is tested. A guard is
the precondition for an action to complete successfully.
Execution continues only when the guard is true, ensuring
the successful completion of the method invocation.
Execution is temporarily suspended until the guard becomes
true, at which time execution may continue.
Thread controlling methods


CS 3331
wait(), notify(), and notifyAll() defined in the class Object
Can be used to implement guarded suspension
31
Thread Methods of Class Object
Method
Description
wait()
Temporarily blocked and placed in the wait queue
of the receiving object.
notify() Awaken one of the threads in the wait queue
associated with the receiving object and remove it
from the queue.
notifyAll() Same as notify() except that all threads in the wait
queue are awakened.
CS 3331
32
Bounded Queue with Guarded
Suspension
public class BoundedQueueWithGuard extends BoundedQueue {
public Producer(int size) {
super(size);
}
public synchronized boolean isEmpty() {
return super.isEmpty();
}
public synchronized boolean isFull() {
return super.isFull();
}
public synchronized int getCount() {
return super.getCount();
}
<<put, get, and main method>>
}
CS 3331
33
Put Method
public synchronized void put(Object obj) {
try {
while (isFull()) {
wait();
}
} catch (InterruptedException e) {}
super.put(e);
notify();
}
CS 3331
34
Exercise --- Get Method
Write the get method with guarded suspension.
public synchronized Object get() {
// WRITE YOUR CODE HERE.
}
CS 3331
35
Smart Thief and Broker
public static void main(String args[]) {
BoundedQueueWithGuard queue = new BoundedQueueWithGuard(5);
new Thief(queue, 15).start(); // produce 15 items.
new Broker(queue, 15).start(); // consume 15 times
}
Output:
No lost items!
CS 3331
36