GuangchaoSun's Blog

Java线程内存模型-如何保证线程安全

volatile

volatile要求程序对变量的每次修改,都写回主内存,读变量是从主内存中读取,这样便对其他线程可见,解决了可见性问题,但是不能保证数据的一致性;特别注意,原子操作:根据Java规范,对于基本类型的赋值或者返回操作,是原子操作。但这里的基本数据类型不包括long和double,因为JVM看到的基本存储单位是32位,而long和double是64位。所以无法在一个时钟周期内完成。

通俗的讲一个对象的状态就是它的数据,存储在状态变量中,比如实力域或者静态域;无论何时,只要多于一个的线程访问给定的状态变量。而且其中某个线程会写入该变量,此时必须使用同步来协调线程对该变量的访问。

synchronized

同步锁:每个Java对象有且只有一个同步锁,任何时刻,最多只允许一个线程拥有这把锁。

当一个线程试图访问带有synchronized(this)标记的代码块时,必须获得this关键字应用对象的锁,在以下的两种情况下,此线程将有不同的命运:

  1. 假如这个锁已经被其他的线程占用,JVM就会把这个线程放到本对象的锁池中。本线程进入阻塞状态。锁池可能有很多线程,等到其他的线程释放了锁,JVM就会从锁池中随机取出一个线程,使这个线程拥有锁,并且转到就绪状态。
  2. 假如这个锁没有被占用,本线程会获得这把锁,开始执行同步代码块。(一般情况下在执行同步代码块时不会释放同步锁,但也有特殊情况会释放同步锁:如在执行同步代码块时,遇到异常锁而导致线程终止,锁会被释放;在执行代码块时,执行了锁所属对象的wait()方法,这个线程会释放对象锁,进入对象的等待池中)

synchronized关键字保证了数据读写一致和可见性等问题,但他是一种阻塞线程控制方法,在关键字使用期间,所有其他线程不能使用该变量,这就引出了一种叫做非阻塞同步的控制线程安全的需求。

ThreadLocal解析

根据名称上来说他是local variable(线程局部变量)。它的功能很简单,就是为每一个使用该变量的线程都提供一个变量值的副本,是每一个线程都可以独立地改变自己的副本,而不会和其他线程冲突。从线程的角度看,就好像每一个线程都完全拥有该变量。

每个线程都保持对其线程局部变量副本的隐式引用,只要线程是活动的并且ThreadLocal实例是可访问的;在线程消失之后,其线程局部实例的所有副本都会被垃圾回收(除非存在对这些副本的其他引用)。

Object wait()和notify()方法解析

Object的wait()和notify()、notifyAll()方法,使用一个对象作为锁,然后调用wait()就会挂起当前线程,同时释放对象锁。

  • notify()使用首先获取对象锁然后才能唤醒被挂起的线程(因为等待对象锁而挂起的)。
  • notifyAll()唤醒在此对象监视器上等待的所有线程。
  • wait()在其他线程调用对象的notify()或notifyAll()方法前,导致线程等待。
    抛出IllegalMonitorStateException-如果当前线程不是此对象监视器的所有者。

用wait,notify解决生产者消费者问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Created by 11981 on 2017/12/28.
*/
class Meal{
private int orderNum;
public Meal(int orderNum){
this.orderNum = orderNum;
}
public String toString() {
return "Meal : " + orderNum;
}
}
class WaitPerson implements Runnable{
private Restaurant restaurant;
public WaitPerson(Restaurant r){
this.restaurant = r;
}
@Override
public void run() {
try {
while (!Thread.interrupted()){
synchronized (this){
while (restaurant.meal == null){
wait();//for the chef to produce a meal
}
}
System.out.println("WaitPerson got" + restaurant.meal);
synchronized (restaurant.chef){
restaurant.meal = null;
restaurant.chef.notifyAll();
}
}
TimeUnit.MICROSECONDS.sleep(100);
}catch (InterruptedException e){
System.out.println("WaitPerson interrupted");
}
}
}
class Chef implements Runnable{
private Restaurant restaurant;
private int count = 0;
public Chef(Restaurant restaurant){
this.restaurant = restaurant;
}
@Override
public void run() {
try {
while (!Thread.interrupted()){
synchronized (this){
while (restaurant.meal != null){
wait();//wait meal to be taken
}
if (++count == 10){
System.out.println("Out of food, closing");
restaurant.exec.shutdown();
}
System.out.println("Order up");
synchronized (restaurant.waitPerson){
restaurant.meal = new Meal(count);
restaurant.waitPerson.notifyAll();
}
}
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
public class Restaurant {
Meal meal;
ExecutorService exec = Executors.newCachedThreadPool();
WaitPerson waitPerson = new WaitPerson(this);
Chef chef = new Chef(this);
public Restaurant(){
exec.execute(chef);
exec.execute(waitPerson);
}
public static void main(String[] args){
new Restaurant();
}
}

用ArrayBlockingQueue解决生产者消费者问题:默认使用的是非公平锁

take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止,若请求不到则此线程被加入阻塞队列;
如使用公平锁,当有内容可以消费时,会从队首取出消费者线程进行消费(即等待时间最长的线程)
add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则导致异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package proxy;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* Created by 11981 on 2017/12/28.
*/
public class TestBlockingQueues {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(20);
Thread pro = new Thread(new Producer(queue), "生产者");
pro.start();
for (int i=0; i < 10; i++){
Thread t = new Thread(new Consumer(queue), "消费者 " + i);
t.start();
}
}
}
class Producer implements Runnable{
BlockingQueue<String> queue;
public Producer(BlockingQueue<String> queue){
this.queue = queue;
}
@Override
public void run() {
int i=0;
while (true){
try {
System.out.println("生产者生产食物,食物编号为:" + i);
queue.put("食物" + i++);
Thread.sleep(1000);
}catch (InterruptedException e){
System.out.println("生产者被中断");
}
}
}
}
class Consumer implements Runnable{
BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> queue){
this.queue = queue;
}
@Override
public void run() {
while (true){
try {
System.out.println(Thread.currentThread().getName() + "消费"
+ queue.take());
}catch (InterruptedException e){
System.out.println("消费者被中断");
}
}
}
}