GuangchaoSun's Blog

Java并发性与多线程

多线程的代价

  • 设计更复杂:
  • 上下文切换的开销:
    • 上下文切换(context switch):当CPU从执行一个线程切换到另一个线程的时候,它需要首先存储当前线程的本地数据,程序指针等,然后载入另一个线程的本地数据,程序指针等。最后才开始执行
  • 增加资源消耗:

启动线程的顺序可以是有序的,但执行的顺序并非是有序的。这是因为线程是并行执行而非顺序的。JVM和操作系统一起决定了线程的执行顺序,它和线程的启动顺序并非一定是一致的。

竟态条件与临界区

当两个线程竞争同一资源时,如果对资源的访问顺序敏感,就称存在竟态条件。导致竟态条件发生的代码区称作临界区。在临界区使用适当的同步就可以避免竟态条件

线程安全与共享资源

局部变量

局部变量存储在线程自己的栈中。也就是说,局部变量永远也不会被多个线程共享。所以基础类型的局部变量是线程安全的

1
2
3
4
public void someMethod(){
long threadSafeInt = 0;
threadSafeInt++;
}

局部的对象引用

对象的局部引用和基础类型的局部引用不太一样。尽管引用本身没有被线程共享,但引用所指的对象并没有存储在线程的栈内。所有对象都存在共享堆中。如果在某个方法中创建的对象不会逃逸出(即该对象不会被其他方法获得,也不会被非局部变量引用到)该方法,那么他就是线程安全的

1
2
3
4
5
6
7
8
9
public void someMethod(){
LocalObject localObject = new LocalObject();
localObject.callMethod();
method2(localObject);
}
public void method2(LocalObject localObject){
localObject.setValue("value");
}

样例中LocalObject对象没有被方法返回,也没有被传递给someMethod()方法外的对象。每个执行someMethod()的线程都会创建自己的LocalObject对象,并赋值给localObject引用。因此,这里的LocalObject是线程安全的。事实上,整个someMethod()都是线程安全的。即使将LocalObject作为参数传给同一个类的其它方法或其它类的方法时,它仍然是线程安全的。当然,如果LocalObject通过某些方法被传给了别的线程,那它就不再是线程安全的了。

对象成员

对象成员存储在堆上。如果两个线程同时更新同一个对象的同一个成员,那这个代码就不是线程安全的。

1
2
3
4
5
6
7
public class NotThreadSafe{
StringBuilder builder = new StringBuilder();
public add(String text){
this.builder.append(text);
}
}

如果两个线程同时同时调用一个NotThreadSafe实例上的add()方法,就会有竟态条件问题。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
NotThreadSafe sharedInstance = new NotThreadSafe();
new Thread(new MyRunnable(sharedInstance)).start();
new Thread(new MyRunnable(sharedInstance)).start();
public class MyRunnable implements Runnable{
NotThreadSafe instance = null;
public MyRunnable(NotThreadSafe instance){
this.instance = instance;
}
public void run(){
this.instance.add("some text");
}
}

注意两个MyRunnable共享了一个NotThreadSafe对象。因此,当他们调用add()方法时就会造成竟态条件。

1
2
new Thread(new MyRunnable(new NotThreadSafe())).start();
new Thread(new MyRunnable(new NotThreadSafe())).start();

现在两个线程都有自己单独的NotThreadSafe对象,调用add()方法时就会互不干扰,再也不会有竞态条件问题了。所以非线程安全的对象仍可以通过某种方式来消除竞态条件。

线程控制逃逸规则

如果一个资源的创建,使用,销毁都在同一个线程内完成,且永远不会脱离该线程的控制,则该资源的使用就是线程安全的。

资源可以是对象,数组,文件,数据库连接,套接字等等。

线程安全及不可变性

当多个线程访问同一个资源,并且其中的一个或多个线程对这个资源进行了写操作,才会产生竟态条件。多个线程同时读同一个资源不会产生竟态条件。
我们可以通过创建不可变的共享对象来保证对象在线程间不会被修改,从而实现线程安全。如下:

1
2
3
4
5
6
7
8
9
public class ImmutableValue{
private int value = 0;
public ImmutableValue(int value){
this.value = value;
}
public int getValue(){
return this.value;
}
}

请注意ImmutableValue类的成员变量value是通过构造函数赋值的,并且在类中没有set方法。这意味着一旦ImmutableValue实例被创建,value变量就不能再被修改,这就是不可变性。但你可以通过getValue()方法读取这个变量的值。

如果你需要对ImmutableValue类的实例进行操作,可以通过得到value变量后创建一个新的实例来实现,下面是一个对value变量进行加法操作的示例:

1
2
3
4
5
6
7
8
9
10
11
12
public class ImmutableValue(){
private int value = 0;
public ImmutableValue(int value){
this.value = value;
}
public int getValue(){
return this.value;
}
public ImmutableValue add(int valueToAdd){
return new ImmutableValue(this.value + valueToAdd);
}
}

引用不是线程安全的!
即使一个对象是线程安全的不可变对象,指向这个对象的引用也可能不是线程安全的。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void Calculator{
private ImmutableValue currentValue = null;
public ImmutableValue getValue(){
return currentValue;
}
public void setValue(ImmutableValue newValue){
this.currentValue = newValue;
}
public void add(int newValue){
this.currentValue = this.currentValue.add(newValue);
}
}

Calculator类持有一个指向ImmutableValue实例的引用。注意,通过setValue()方法和add()方法可能会改变这个引用。因此,即使Calculator类内部使用了一个不可变对象,但Calculator类本身还是可变的,因此Calculator类不是线程安全的。换句话说:ImmutableValue类是线程安全的,但使用它的类不是。当尝试通过不可变性去获得线程安全时,这点是需要牢记的。

要使Calculator类实现线程安全,将getValue()、setValue()和add()方法都声明为同步方法即可。

Java内存模型内部原理

每一个运行在Java虚拟机里的线程都有自己的线程栈。这个线程栈包含了这个线程调用的方法当前执行点相关的信息。一个线程仅能访问自己的线程栈。一个线程创建的本地变量对其他线程不可见,仅自己可见。即使两个线程执行同样的代码,这两个线程任然在在自己的线程栈中的代码来创建本地变量。因此,每个线程拥有每个本地变量的独有版本。

所有原始类型的本地变量都存放在线程栈上,因此对其它线程不可见。一个线程可能向另一个线程传递一个原始类型变量的拷贝,但是他不能共享这个原始类型变量自身。

Java内存模型和硬件内存架构之间的桥接

对于硬件,所有线程栈和堆都分布在主内存中,部分线程栈和堆可能有时候会出现在CPU缓存中和CPU内部的缓存器中。

想象一下,共享对象被初始化在主存中。跑在CPU上的一个线程将这个共享对象读到CPU缓存中。然后修改了这个对象。只要CPU缓存没有被刷新会主存,对象修改后的版本对跑在其它CPU上的线程都是不可见的。这种方式可能导致每个线程拥有这个共享对象的私有拷贝,每个拷贝停留在不同的CPU缓存中。

下图示意了这种情形。跑在左边CPU的线程拷贝这个共享对象到它的CPU缓存中,然后将count变量的值修改为2。这个修改对跑在右边CPU上的其它线程是不可见的,因为修改后的count的值还没有被刷新回主存中去。

volatile关键字可以保证直接从主存中读取一个变量,如果这个变量被修改后,总是会被写回到主存中去。

race conditions


一个同步块可以保证在同一个时刻仅有一个线程可以进入代码的临界区。同步块还可以保证代码中所有访问变量将会从主存中读入,当线程退出同步代码块时,所有被更新的变量都会被刷新回主存中去,不管这个变量是否被声明为volatile。

Java同步块

Java同步块分为以下四种类型

  • 实例方法同步
    • 实例方法同步是同步在拥有该方法的对象上。一个实例一个线程
  • 静态方法同步
    • 静态方法同步是指同步在该方法的所有类对象上。因为在Java虚拟机中一个类只能对应一个类对象,所以同时只允许一个线程执行同一个类中的静态方法。
  • 实例方法中的同步块
  • 静态方法中的同步块

实例方法中的同步块

1
2
3
4
5
public void add(int value){
synchronized(this){
this.count += value;
}
}

注意Java同步块构造器用括号将对象括起来。在上例中,使用了“this”,即为调用add方法的实例本身。在同步构造器中用括号括起来的对象叫做监视器对象。上述代码使用监视器对象同步,同步实例方法使用调用方法本身的实例作为监视器对象。

一次只有一个线程能够在同步于同一个监视器对象的Java方法内执行。

下面两个例子都同步他们所调用的实例对象上,因此他们在同步的执行效果上是等效的。

1
2
3
4
5
6
7
8
9
10
11
12
public class MyClass{
public synchronized void log1(String msg1, String msg2){
log.writeln(msg1);
log.writeln(msg2);
}
public void log2(String msg1, String msg2){
synchronized(this){
log.writeln(msg1);
log.writeln(msg2);
}
}
}

在上例中,每次只有一个线程能够在两个同步块中任意一个方法内执行。

如果第二个同步块不是同步在this实例对象上,那么两个方法可以被线程同时执行。

静态方法中的同步块

1
2
3
4
5
6
7
8
9
10
11
12
public class MyClass{
public static synchronized void log1(String msg1, String msg2){
log.writeln(msg1);
log.writeln(msg2);
}
public void log2(String msg1, String msg2){
synchronized(MyClass.class){
log.writeln(msg1);
log.writeln(msg2);
}
}
}

这两个方法不允许同时被线程访问。

如果第二个同步块不是同步在MyClass.class这个对象上。那么这两个方法可以同时被线程访问。

线程通信

wait(),notify()和notifyAll()

Java有一个内建的等待机制来允许线程在等待信号的是否变为非运行状态。java.lang.Object类定义了三个方法,wait()、notify和notifyAll()来实现这个等待机制。

一个线程一旦调用了任意对象的wait()方法,就会变为非运行状态,直到另一个线程调用了同一个对象的notify()方法。为了调用wait()或者notify(),线程必须先获得那个对象的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class MonitorObject {
}
public class MyWaitNotify {
MonitorObject monitorObject = new MonitorObject();
public void doWait(){
synchronized (monitorObject){
try {
monitorObject.wait();
}catch (InterruptedException e){
}
}
}
public void doNotify(){
synchronized (monitorObject){
monitorObject.notify();
}
}
}

等待线程将调用doWait(),而唤醒线程将调用doNotify()。当一个线程调用一个对象的notify()方法,正在等待该对象的所有线程中将有一个线程被唤醒并允许执行(校注:这个将被唤醒的线程是随机的,不可以指定唤醒哪个线程)。同时也提供了一个notifyAll()方法来唤醒正在等待一个给定对象的所有线程。

如你所见,不管是等待线程还是唤醒线程都在同步块里调用wait()和notify()。这是强制性的!一个线程如果没有持有对象锁,将不能调用wait(),notify()或者notifyAll()。否则,会抛出IllegalMonitorStateException异常。

死锁

死锁是两个或更多线程阻塞着等待其他处于死锁状态的线程所持有的锁。死锁通常发生在多个线程同时但以不同的顺序请求同一组锁的时候。

避免死锁

从下面几个途径来进行:

  • 加锁顺序
  • 加锁时限
  • 死锁检测

饥饿和公平

如果一个线程因为CPU时间全部被其他线程抢走而得不到CPU运行时间,这种状态称为“饥饿”。解决饥饿的方案被称之为“公平性”-即所有线程均能公平地获得运行机会。
在Java中,以下三个常见的原因会导致线程饥饿:

  1. 高优先级线程吞噬所有低优先级线程的CPU时间
  2. 线程被永久堵塞在一个等待进入同步块的状态,因为其他线程总是能在它之前持续地对该同步块进行访问。
  3. 线程在等待一个本身(在其上调用wait)也处于永久等待的对象,因为其他线程总是被持续地获得唤醒。
    • 如果多个线程处在wait()方法执行上,而对其调用notify()不会保证哪一个线程会获得唤醒,任何线程都有可能处于继续等待的状态。因此存在这样一个风险:一个等待线程从来得不到唤醒,因为其他等待线程总是能被获得唤醒。

在Java中实现公平性

1
2
3
4
5
public class Synchronizer{
public synchronized void doSynchronized(){
//do a lot of work whick takes a long time
}
}

如果有一个以上的线程调用doSynchronized()方法,在第一个获得访问的线程未完成前,其他线程将一直处于阻塞状态,而且在这种多线程被阻塞的场景下,接下来将是哪个线程获得访问是没有保障的。

使用锁方式代替同步块

1
2
3
4
5
6
7
8
public class Synchronizer{
Lock lock = new Lock();
public void doSynchronized()throws InterruptedException{
this.lock.lock();
//do a lot of work whick takes a long time
this.lock.unlock();
}
}

注意到上面对Lock的实现,如果存在多线程并发访问lock(),这些线程将阻塞在对lock()方法的访问上。另外,如果锁已经锁上(校对注:这里指的是isLocked等于true时),这些线程将阻塞在while(isLocked)循环的wait()调用里面。要记住的是,当线程正在等待进入lock() 时,可以调用wait()释放其锁实例对应的同步锁,使得其他多个线程可以进入lock()方法,并调用wait()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class FairLock {
private boolean isLocked = false;
private Thread lockingThread = null;
private List<QueueObject> waitingThreads = new ArrayList<QueueObject>();
public void lock() throws InterruptedException{
QueueObject queueObject = new QueueObject();
boolean isLockedForThisThread = true;
synchronized (this){
waitingThreads.add(queueObject);
}
while (isLockedForThisThread){
synchronized (this){
waitingThreads.remove(queueObject);
lockingThread = Thread.currentThread();
return;
}
}
try {
queueObject.doWait();
}catch (InterruptedException e){
synchronized (this){
waitingThreads.remove(queueObject);
}
throw e;
}
}
public synchronized void unlock(){
if (this.lockingThread != Thread.currentThread()){
throw new IllegalMonitorStateException("" +
"Calling thread has not locked this lock");
}
isLocked = false;
lockingThread = null;
if (waitingThreads.size() > 0){
waitingThreads.get(0).doNotify();
}
}
}

FairLock新创建了一个QueueObject的实例,并对每个调用lock()的线程进行入队列。调用unlock()的线程将从队列头部获取QueueObject,并对其调用doNotify(),以唤醒在该对象上等待的线程。通过这种方式,在同一时间仅有一个等待线程获得唤醒,而不是所有的等待线程。这也是实现FairLock公平性的核心所在。

Java中的读/写锁

读写锁的实现

读取:没有线程正在做写操作且没有线程请求写操作
写入:没有线程正在做写操作

这里假设写操作的优先级比读操作高

写锁重入

当一个线程已经拥有写锁,才允许写锁重入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ReadWriteLock {
private Map<Thread, Integer> readingThreads = new HashMap<Thread, Integer>();//已经持有读锁的线程和对应线程获取读锁的次数
private int writeAccesses = 0;
private int writeRequest = 0;//请求写操作
private Thread writingThread = null;
public synchronized void lockWrite() throws InterruptedException{
writeRequest++;//写请求加1
Thread callingThread = Thread.currentThread();
//不能获得写权限,进入等待
if (!canGrantWriteAccess(writingThread)){
wait();
}
writeRequest--;
writeAccesses++;
writingThread = callingThread;
}
public synchronized void unlockWrite(){
writeAccesses--;
if (writeAccesses == 0){
writingThread = null;
}
notifyAll();
}
private boolean canGrantWriteAccess(Thread callingThread){
if (hasReaders()) return false;
if (writingThread == null) return true;//无线程持有写锁,返回true
if (!isWriter(callingThread)) return false;
return true;
}
//是否有线程持有读锁
private boolean hasReaders(){
return readingThreads.size() > 0;
}
//该线程是否拥有写锁
private boolean isWriter(Thread callingThread){
return callingThread == writingThread;
}
}

读锁升级到写锁

有时候,我们希望一个拥有读锁的线程,也可以获得写锁。这时候需要这个线程是唯一一个拥有读锁的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ReadWriteLock2 {
private Map<Thread, Integer> readingThreads = new HashMap<Thread, Integer>();
private int writeAccesses = 0;
private int writeRequest = 0;
private Thread writingThread = null;
public synchronized void lockWrite()throws InterruptedException{
writeRequest++;
Thread callingThread = Thread.currentThread();
if (!canGrantWriteAccess(callingThread)){
wait();
}
writeRequest--;
writeAccesses++;
writingThread = callingThread;
}
public synchronized void unlockWrite(){
writeAccesses--;
if (writeAccesses == 0){
writingThread = null;
}
notifyAll();
}
private boolean canGrantWriteAccess(Thread callingThread){
if (isOnlyReader(callingThread)) return true;
if (hasReaders()) return false;
if (writingThread == null) return true;
if (!isWriter(callingThread)) return false;//不持有写锁的线程不允许重入
return true;
}
private boolean hasReaders(){
return readingThreads.size() > 0;
}
private boolean isWriter(Thread callingThread){
return callingThread == writingThread;
}
//是唯一的读锁线程
private boolean isOnlyReader(Thread thread){
return readingThreads.size() == 1 &&
readingThreads.get(thread) != null;
}
}

可重入的ReadWriteLock的完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public class ReadWriteLock3 {
private Map<Thread, Integer> readingThreads = new HashMap<>();
private int writeAccesses = 0;
private int writeRequest = 0;
private Thread writingThread = null;
public synchronized void lockRead() throws InterruptedException{
Thread callingThread = Thread.currentThread();
while(!canGrantReadAccess(callingThread)){
wait();
}
readingThreads.put(callingThread, getReadAccessCount(callingThread) + 1);
}
public synchronized void unlockRead(){
Thread callingThread = Thread.currentThread();
if (!isReader(callingThread)){
throw new IllegalMonitorStateException(
"Calling Thread does not" +
" hold a read lock on this ReadWriteLock");
}
int accessCount = getReadAccessCount(callingThread);
if (accessCount == 1){
readingThreads.remove(callingThread);
}else{
readingThreads.put(callingThread, (accessCount-1));
}
notifyAll();
}
public synchronized void lockWrite()throws InterruptedException{
writeRequest++;
Thread callingThread = Thread.currentThread();
if (!canGrantWriteAccess(callingThread)){
wait();
}
writeRequest--;
writeAccesses++;
writingThread = callingThread;
}
public synchronized void unlockWrite() throws InterruptedException{
if (!isWriter(Thread.currentThread())){
throw new IllegalMonitorStateException(
"Calling Thread does not" +
" hold the write lock on this ReadWriter");
}
writeAccesses--;
if (writeAccesses == 0){
writingThread = null;
}
notifyAll();
}
private boolean canGrantReadAccess(Thread callingThread){
if (isWriter(callingThread)) return true;
if (hasWriter()) return false;
if (isReader(callingThread)) return true;
if (hasWriteRequest()) return false;
return true;
}
private boolean canGrantWriteAccess(Thread callingThread){
if (isOnlyReader(writingThread)) return true;
if (hasReaders()) return false;
if (writingThread == null) return true;
if (!isWriter(callingThread)) return false;
return true;
}
private int getReadAccessCount(Thread callingThread){
Integer accessCount = readingThreads.get(callingThread);
if (accessCount == null) return 0;
return accessCount.intValue();
}
private boolean hasReaders(){
return readingThreads.size() > 0;
}
private boolean isReader(Thread callingThread){
return readingThreads.get(callingThread) != null;
}
private boolean isOnlyReader(Thread callingThread){
return readingThreads.size() == 1 &&
readingThreads.get(callingThread) != null;
}
private boolean hasWriter(){
return writingThread != null;
}
private boolean isWriter(Thread callingThread){
return writingThread != null;
}
private boolean hasWriteRequest(){
return this.writeRequest > 0;
}
}

对于实例同步方法,锁是当前实例对象。对于静态同步方法,锁是当前对象的class对象

重入锁死

当一个线程重新获取锁,读写锁或者其他不可重入的同步器时,就可能发生重入锁死。可重入的意思是线程可以重复获得它已经持有的锁

避免重入锁死的方法:

  • 编写代码时避免再次获取已经持有的锁
  • 使用可重入锁

信号量

Semapore(信号量)是一种线程同步结构,用于在线程间传递信号,以避免出现信号丢失,或者像锁一样用于保护一个关键区域。
下面是一个信号量的简单实现:

1
2
3
4
5
6
7
8
9
10
11
12
public class Semaphore {
private boolean signal = false;
public synchronized void take(){
this.signal = true;
this.notify();
}
public synchronized void release() throws InterruptedException{
while (!this.signal) wait();
this.signal = false;
}
}

take方法发出一个存放在Semaphore内部的信号,而release方法则等待一个信号,当其接收信号后,标记位signal被清空,然后方法终止。

使用Semaphore来产生信号

下面的例子中,两个线程通过Semaphore发出的信号的通知对方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class SendingThread extends Thread{
Semaphore semaphore = null;
public SendingThread(Semaphore semaphore){
this.semaphore = semaphore;
}
@Override
public void run(){
while (true){
this.semaphore.take();
}
}
}
public class RecevingThread extends Thread{
Semaphore semaphore = null;
public RecevingThread(Semaphore semaphore){
this.semaphore = semaphore;
}
@Override
public void run(){
try {
while (true){
this.semaphore.release();
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
public static void main(String[] args) {
Semaphore semaphore = new Semaphore();
SendingThread sender = new SendingThread(semaphore);
RecevingThread receiver = new RecevingThread(semaphore);
receiver.start();
sender.start();
}
}

其他应用:

可以添加计数功能,添加上限,也可以当锁来使用

阻塞队列

阻塞队列与普通队列的区别在于,当 队列为空时,从队列中取元素的操作将会阻塞;或者当队列是满时,往队列里添加元素的操作将会阻塞。

阻塞队列的原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class BlockingQueue {
private List queue = new LinkedList();
private int limit = 10;
public BlockingQueue(int limit){
this.limit = limit;
}
public synchronized void enqueue(Object item) throws InterruptedException{
while (this.queue.size() == this.limit){
wait();
}
if (this.queue.size() == 0){
notifyAll();
}
this.queue.add(item);
}
public synchronized Object dequeue() throws InterruptedException{
while (this.queue.size() == 0){
wait();
}
if (this.queue.size() == this.limit){
notifyAll();
}
return this.queue.remove(0);
}
}

必须注意到,在enqueue和dequeue方法内部,只有队列的大小等于上限(limit)或者下限(0)时,才调用notifyAll方法。如果队列的大小既不等于上限,也不等于下限,任何线程调用enqueue或者dequeue方法时,都不会阻塞,都能够正常的往队列中添加或者移除元素。

线程池

简单线程池的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class ThreadPool {
private BlockingQueue taskQueue = null;
private List<PoolThread> threads = new ArrayList<PoolThread>();
private boolean isStopped = false;
public ThreadPool(int noOfThread, int maxNoOfThread){
taskQueue = new BlockingQueue(maxNoOfThread);
for (int i=0; i < noOfThread; i++){
threads.add(new PoolThread(taskQueue));
}
for (PoolThread thread : threads){
thread.start();
}
}
public synchronized void execute(Runnable task) throws Exception{
if (this.isStopped) throw new IllegalStateException("ThreadPool is stopped");
this.taskQueue.enqueue(task);
}
public synchronized void stop(){
this.isStopped = true;
for (PoolThread thread : threads){
thread.doStop();
}
}
}
public class PoolThread extends Thread{
private BlockingQueue taskQueue = null;
private boolean isStopped = false;
public PoolThread(BlockingQueue queue){
taskQueue = queue;
}
@Override
public void run(){
while (!isStopped){
try {
Runnable runnable = (Runnable) taskQueue.dequeue();
}catch (Exception e){
//写日志或者报告异常
//但保持线程池运行
}
}
}
public synchronized void doStop(){
isStopped = true;
this.interrupt();//打断池中线程的dequeue的调用
}
public synchronized boolean isStopped(){
return isStopped;
}
}

线程池的实现由两部分组成,类ThreadPool是线程池的公开接口,而类PoolThread用来实现执行任务的子线程。

为了执行一个任务,方法ThreadPool.execute(Runnable r) 用Runnable的实现作为调用参数,在内部,Runnable对象被放入阻塞队列(Blocking Queue),等待着被子线程取出队列。

一个空闲的 PoolThread 线程会把 Runnable 对象从队列中取出并执行。你可以在 PoolThread.run() 方法里看到这些代码。执行完毕后,PoolThread 进入循环并且尝试从队列中再取出一个任务,直到线程终止。

进一步有关线程池的知识:http://ifeve.com/java-threadpool/

CAS

1
2
3
4
5
6
public class MyLock {
private AtomicBoolean locked = new AtomicBoolean(false);
public boolean lock(){
return locked.compareAndSet(false, true);
}
}

剖析同步器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class BoundedSemaphore {
private int signals = 0;
private int bound = 0;
public BoundedSemaphore(int upperBound){
this.bound = upperBound;
}
public synchronized void take() throws InterruptedException{
while (this.signals == bound) wait();
this.signals++;
this.notify();
}
public synchronized void release() throws InterruptedException{
while (this.signals == 0) wait();
this.signals--;
this.notify();
}
}
  • 状态:同步器中的状态是用来确定某个线程是否有访问权限
  • 访问条件:访问条件决定调用test-and-set-state方法的线程是否可以对状态进行设置。
  • 状态变化:一旦一个线程获得了临界区的访问权限,它得改变同步器的状态,让其他线程阻塞,防止他们进入临界区。
    • 通知策略:一旦某个线程改变了同步器的状态,可能需要通知其他等待的线程状态已经变了。