Multithreading in Java

Introduction to Threads

The below article demonstrates some of the advanced concepts in Java multi-threading. You must already be familiar with the basics of Threads in Java. There are four ways to implement a thread: (a) Extending Thread Class (b) Implementing Runnable interface and passing it to the Thread constructor (c) Through Anonymous class as shown below (d) Through Executor service which creates a thread pool.We will discuss the last two ways.

The code below shows three important concepts: (a) Use of volatile, (b) Use of final, and (c) Anonymous Class


import java.util.Scanner;

public class AnonymousRunnable {

//1. Use of volatile
private volatile boolean running = true;

//2. Use of final
public void processor(final String world) {

//3. Use of anonymous class
Thread t = new Thread(new Runnable() {

@Override
public void run() {

while (running) {
System.out.println("Hello" + world);
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
});

t.start();
}

public void shutdown(){
running=false;
}

public static void main(String args[]) {
AnonymousRunnable ar = new AnonymousRunnable();
ar.processor("world");

System.out.println("Hit Enter to stop...");
Scanner sc = new Scanner(System.in);
sc.nextLine();

ar.shutdown();
}
}

Use of volatile:

Volatile keyword is required so that the executing thread always goes to the register to fetch the value of the variable instead of depending on  the cached value. In the below example the variable “running” is declared as volatile because it is being used inside a thread and then being changed from somewhere  outside of thread. A thread would only consider what is being changed inside a run method so assumes that the variable running can be cached for optimisation but this would result in “running” always evaluating to true which is wrong. Hence we declare it as volatile.

Use of final keyword:

In the below code, we need to declare the String as final because it is being accessed by a method in the anonymous inner class. If we don’t declare it is as final, compiler will throw an error. This happens because the invocation of processor method can finish before the inner class’s run method. When processor goes out of scope so does all the local variables and the run method won’t be able to access the variable which doesn’t exist in memory. When you declare it as final, the variable becomes a constant and would be there throughout the execution.

Anonymous Class:

If you want something to happen just once, then one of the technique is to use Anonymous class. In the below example we are implementing the Runnable interface in the anonymous class and overriding the run method.

Thread Joining and Synchronisation

In the below example, we will discuss Thread joining and synchronisation.


public class ThreadJoin {

	private int count=0;

	//1. Use of synchronized
	public synchronized void increment(){
		count++;
	}

	public static void main(String args[]) {

		new ThreadJoin().doWork();
	}

	private void doWork() {
		//Run this thread 10000 times
		Thread t1 = new Thread(new Runnable(){
			public void run(){
				for(int i=0;i<10000;i++)
					increment();
			}

		});
                //Run this thread 10000 times
		Thread t2 = new Thread(new Runnable(){
			public void run(){
				for(int i=0;i<10000;i++)
					increment();
			}

		});

		t1.start();
		t2.start();

		//Thread Joining
		try {
			t1.join();
			t2.join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		System.out.println("Count = "+count);

	}

}

1. Thread joining: The Thread class has a join method. When you call a join method on a thread, the calling thread waits for the called thread to finish executing. In this case, the main method waits for t1 and t2 to finish so that the count is incremented desired number of times and then we can print the value.

2. Synchronization: Two or more threads while executing interleave each other. When they update a common variable there is a chance of data corruption. This is how a data corruption can happen:
count++ is not one operation. It is:
(i) get the value of count
(ii) increment it by 1
(iii) update the count variable

We want this operation to be atomic, that is a thread should perform all the three operations before any other thread can do it. For example, the value of count is 100. t1 comes in and reads it and goes to sleep. t2 comes in and reads it, increments and updates it, now count is 101 and goes to sleep now. t1 wakes up and uses the old value of count, increments it and updates it. count is again 101 instead of 102. To avoid such situations we use synchronisation.

Multiple Locks/Synchronized blocks

We have seen that to act on shared data we must use synchronisation so that the data is accessed by one thread for the entire set of operations and is consistent. In previous example, we used synchronised keyword on the method which made sure that only one thread can enter the method to run the code. How does this happen? When the method is synchronised, the thread needs to acquire a lock on the object to enter and execute the method. In the ThreadJoin example, t1 acquires lock from the ThreadJoin object, does its work, releases the lock. In the mean time, t2 keeps asking the object for the lock to enter the method but would only get it when the lock is released by t1. Thus, we avoided the corruption.

In this example, we will look at another approach, that is to use independent locks that are independent of the object which contains the instance methods. This is done using synchronised blocks. There are two advantages of using synchronised blocks over method level synchronisation. (1) If you use synchronised blocks, you can get done quicker by only locking access to specific piece of code than the whole method. (2) When you use method level synchronisation, the lock is always on the current object. By using synchronised blocks you can specify the object on which you want to place the lock. Thus it is flexible. If you see the below example, t1 and t2 act on list1 and list2 but we don’t want to lock the entire object, hence we avoid putting synchronised on method because if t1 is executing stage one/list1 .. t2 is just sitting idle and vice versa although these are independent methods. Hence the overall execution time will be 4 seconds instead of 2 seconds. To solve this problem we declare two Objects and put a lock on them using the syntax synchronised(lock1) and synchronised(lock2). This means if t1 fetches lock1 and increments list1, t2 still can fetch lock2 and increment list2 and vice versa. This results in overall time of 2 seconds.

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class MultipleLocks {

	private Random random = new Random();

        //Objects created for locking. Only one thread at a time can acquire the lock on the object.
        private Object lock1 = new Object();
	private Object lock2 = new Object();

	private List list1 = new ArrayList();
	private List list2 = new ArrayList();

	public void stageone(){
//we put lock on lock1 object instead of the MultipleLocks object itself using synchronised blocks
		synchronized(lock1) {
		try {
			Thread.sleep(1);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		list1.add(random.nextInt(100));
		}
	}

//we put lock on lock2 object instead of the MultipleLocks object itself.
	public void stagetwo(){
		synchronized(lock2){
		try {
			Thread.sleep(1);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		list2.add(random.nextInt(100));
	}}

	public void process(){

		for(int i=0;i<1000;i++){
			stageone();
			stagetwo();
		}

		System.out.println();

	}

	public void processor(){
		System.out.println("Starting ...");
		long start = System.currentTimeMillis();

		Thread t1 = new Thread(new Runnable(){
			@Override
			public void run() {
				process();
			}
		});

		Thread t2 = new Thread(new Runnable(){
			@Override
			public void run() {
				process();
			}
		});

		t1.start();
		t2.start();

		try {
			t1.join();
			t2.join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		long end = System.currentTimeMillis();

		System.out.println("Time Taken:"+(end-start)/1000+" seconds ");

		System.out.println("List1 Size:"+list1.size()+"\tList2 Size:"+list2.size());
	}

	public static void main(String[] args) {

		new MultipleLocks().processor();
	}

}

Executor Service/Thread Pools

Executor services provide a pool of threads to work with. You need to submit tasks to the executor service. The executor service will assign threads to complete the tasks. The more the worker threads, the more quickly the tasks get done. ExecutorService maintains a thread pool, thus avoiding the Thread creation overhead.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class Processor implements Runnable {

	public void run() {

		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

public class ExecutorServiceExample  {

	public static void main(String[] args) {

		ExecutorService es = Executors.newFixedThreadPool(5);

		long start = System.currentTimeMillis();

		//submitting the tasks (Task should be a thread) to executor service
		for(int i=0; i< 5; i++){
			es.submit(new Processor());
		}

                //it means the executor will shutdown when the tasks will complete the execution
		es.shutdown();

//normal flow continues while the threads continue to run.
		System.out.println("All tasks submitted");

//awaitTermination will wait for all the tasks to be completed. If they don't complete in a timeout period, it stops them abruptly and continues the processing.
		try {
			es.awaitTermination(1, TimeUnit.DAYS);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		long end = System.currentTimeMillis();

		System.out.println("All tasks completed in "+((end-start)/1000)+" seconds");

	}

}

CountDownLatch Example


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class Processor1 implements Runnable {
private CountDownLatch latch;

public Processor1(CountDownLatch latch) {
this.latch = latch;
}

public void run() {
System.out.println(“Started.”);

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

latch.countDown();
}
}

public class CountDownLatchExample {

public static void main(String[] args) {

CountDownLatch latch = new CountDownLatch(3);

ExecutorService executor = Executors.newFixedThreadPool(3);

for(int i=0; i < 3; i++) {
executor.submit(new Processor1(latch));
}

try {
latch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

System.out.println(“Completed.”);
}

}

Producer-Consumer/BlockingQueue

 

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class App {

private static BlockingQueue queue = new ArrayBlockingQueue(10);

public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new Runnable() {
public void run() {
try {
producer();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});

Thread t2 = new Thread(new Runnable() {
public void run() {
try {
consumer();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});

t1.start();
t2.start();

t1.join();
t2.join();
}

private static void producer() throws InterruptedException {
Random random = new Random();

while(true) {
queue.put(random.nextInt(100));
}
}

private static void consumer() throws InterruptedException {
Random random = new Random();

while(true) {
Thread.sleep(100);

if(random.nextInt(10) == 0) {
Integer value = queue.take();

System.out.println(“Taken value: ” + value + “; Queue size is: ” + queue.size());
}
}
}
}

 

Producer-Consumer (Wait/Notify)


import java.util.Scanner;

class Processor2 {

public void produce() throws InterruptedException {
synchronized (this) {
System.out.println(“Producer thread running ….”);
wait();
System.out.println(“Resumed.”);
}
}

public void consume() throws InterruptedException{

Scanner scanner = new Scanner(System.in);
Thread.sleep(2000);

synchronized (this) {
System.out.println(“Waiting for return key.”);
scanner.nextLine();
System.out.println(“Return key pressed.”);
notify();
Thread.sleep(5000);
}

}

}

public class ProducerConsumerWaitNotify {

public static void main(String[] args) throws InterruptedException {

final Processor2 processor = new Processor2();

Thread t1 = new Thread(new Runnable() {

@Override
public void run() {
try {
processor.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

Thread t2 = new Thread(new Runnable() {

@Override
public void run() {
try {
processor.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

t1.start();
t2.start();

t1.join();
t2.join();
}
}

Low Level Producer Consumer

 

import java.util.LinkedList;
import java.util.Random;

public class Processor {

private LinkedList list = new LinkedList();
private final int LIMIT = 10;
private Object lock = new Object();

public void produce() throws InterruptedException {

int value = 0;

while (true) {

synchronized (lock) {

while(list.size() == LIMIT) {
lock.wait();
}

list.add(value++);
lock.notify();
}

}
}

public void consume() throws InterruptedException {

Random random = new Random();

while (true) {

synchronized (lock) {

while(list.size() == 0) {
lock.wait();
}

System.out.print(“List size is: ” + list.size());
int value = list.removeFirst();
System.out.println(“; value is: ” + value);
lock.notify();
}

Thread.sleep(random.nextInt(1000));
}
}
}

public class App {

public static void main(String[] args) throws InterruptedException {

final Processor processor = new Processor();

Thread t1 = new Thread(new Runnable() {

@Override
public void run() {
try {
processor.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

Thread t2 = new Thread(new Runnable() {

@Override
public void run() {
try {
processor.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

t1.start();
t2.start();

t1.join();
t2.join();
}
}

 

Deadlocks


import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Runner {
private Account acc1 = new Account();
private Account acc2 = new Account();

private Lock lock1 = new ReentrantLock();
private Lock lock2 = new ReentrantLock();

private void acquireLocks(Lock firstLock, Lock secondLock) throws InterruptedException {
while(true) {
// Acquire locks

boolean gotFirstLock = false;
boolean gotSecondLock = false;

try {
gotFirstLock = firstLock.tryLock();
gotSecondLock = secondLock.tryLock();
}
finally {
if(gotFirstLock && gotSecondLock) {
return;
}

if(gotFirstLock) {
firstLock.unlock();
}

if(gotSecondLock) {
secondLock.unlock();
}
}

// Locks not acquired
Thread.sleep(1);
}
}

public void firstThread() throws InterruptedException {

Random random = new Random();

for (int i = 0; i < 10000; i++) {

acquireLocks(lock1, lock2);

try {
Account.transfer(acc1, acc2, random.nextInt(100));
} finally {
lock1.unlock();
lock2.unlock();
}
}
}

public void secondThread() throws InterruptedException {
Random random = new Random();

for (int i = 0; i < 10000; i++) {

acquireLocks(lock2, lock1);

try {
Account.transfer(acc2, acc1, random.nextInt(100));
} finally {
lock1.unlock();
lock2.unlock();
}
}
}

public void finished() {
System.out.println(“Account 1 balance: ” + acc1.getBalance());
System.out.println(“Account 2 balance: ” + acc2.getBalance());
System.out.println(“Total balance: ”
+ (acc1.getBalance() + acc2.getBalance()));
}
}

public class App {

public static void main(String[] args) throws Exception {

final Runner runner = new Runner();

Thread t1 = new Thread(new Runnable() {
public void run() {
try {
runner.firstThread();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});

Thread t2 = new Thread(new Runnable() {
public void run() {
try {
runner.secondThread();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});

t1.start();
t2.start();

t1.join();
t2.join();

runner.finished();
}

}

class Account {
private int balance = 10000;

public void deposit(int amount) {
balance += amount;
}

public void withdraw(int amount) {
balance -= amount;
}

public int getBalance() {
return balance;
}

public static void transfer(Account acc1, Account acc2, int amount) {
acc1.withdraw(amount);
acc2.deposit(amount);
}
}

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s