同步机制

概念

  • 线程安全

    一个类在多线程环境下始终表现出正确的行为

  • 无状态对象、不可变对象

    对象内无任何域,无其他对象的引用,无状态对象是线程安全的

  • 竞态条件

    由于不恰当的执行时序导致不正确的结果

  • 可见性

    一个线程修改了对象状态,其他线程能够看到这个变化

  • 重排序

    没有同步的情况下,底层会对执行顺序进行调整

  • 线程状态

    New(创建线程未start)、Runnable(start但未分配CPU)、Running、Blocked(阻塞让出CPU时间)、waiting(等待)

  • 线程封闭

    避免使用同步,仅在单线程内访问数据的方式: 局部变量和ThreadLocal

  • 重入

    某个线程试图获取自己持有的锁,请求可以成功

  • 如何实现线程安全

    线程安全实现围绕三个特性进行解决,原子性、可见性、有序性
    1、线程安全类
    2、无状态对象、线程封闭 (不共享资源)
    3、原子类 atomic包 (原子性)
    4、volatile (可见性,volatile变量修改后会刷新到主存中,有序性,volatile会通知低层禁止重排序)
    5、使用并发集合ConcurrentHashMap
    6、同步机制(原子性)

线程不安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 以下逻辑如果没有source这个资源共享的话,SynchronizedTest属于无状态对象,没有线程安全问题,source在多线程环境下会导致不正确行为
/**
某次的执行结果
1.start
2.start
1.execute.source:2
1.end
2.execute.source:2
2.end
*/
public class SynchronizedTest {
private int source = 0;
private void execute(long ms, String info) {
try {
Thread.sleep(ms);
source++;
System.out.println(info + ".source:" + source);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void method(String who) {
System.out.println(who + ".start");
execute(1000, who + ".execute");
System.out.println(who + ".end");
}
public static void main(String[] args) {
SynchronizedTest sync = new SynchronizedTest();
new Thread(() -> sync.method("1")).start();
new Thread(() -> sync.method("2")).start();
}
}

同步机制(显式锁)

协调访问共享资源的机制, 有以下方法
synchronized(内置锁)
ReentrantLock
ReadWriteLock
StampedLock

synchronized

method synchronized

When a method is synchronized, it locks the Object, if method is static it locks the Class
When we lock an Object, it acquires lock on all the fields of the Object
JVM指令, 通过下面两个指令实现同步效果(对象会关联一个monitor,获得monitor就获得锁定状态)
monitorenter: 如果为0,线程进入设置1,线程重入加1
monitorexit: 线程退出减1,直到减为0

  • 1、普通方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public class SynchronizedTest {
    private int source = 0;
    private void execute(long ms, String info) {
    try {
    Thread.sleep(ms);
    source++;
    System.out.println(info + ".source:" + source);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    public synchronized void method(String who) {
    System.out.println(who + ".start");
    execute(1000, who + ".execute");
    System.out.println(who + ".end");
    }

    public static void main(String[] args) {
    SynchronizedTest sync = new SynchronizedTest();
    new Thread(() -> sync.method("1")).start();
    new Thread(() -> sync.method("2")).start();
    }
    }
  • 2、静态方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    // 静态方法中只能有静态变量,所以资源会共享给下一个线程, 所以这个结果还是线程安全的
    /**
    1.start
    1.execute.source:1
    1.end
    2.start
    2.execute.source:2
    2.end
    */
    public class SynchronizedTest {
    private static int source = 0;
    private static void execute(long ms, String info) {
    try {
    Thread.sleep(ms);
    source++;
    System.out.println(info + ".source:" + source);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    public synchronized static void method(String who) {
    System.out.println(who + ".start");
    execute(1000, who + ".execute");
    System.out.println(who + ".end");
    }
    public static void main(String[] args) {
    new Thread(() -> method("1")).start();
    new Thread(() -> method("2")).start();
    }
    }

synchronized block

It is preferable to create a dummy private Object to use for synchronized block, so that it’s reference can’t be changed by any other code. For example if you have a setter method for Object on which you are synchronizing, it’s reference can be changed by some other code leads to parallel execution of the synchronized block (使用对象进行加锁时候,最好使用私有对象,防止对象被修改,然后破坏同步块导致并行执行)

  • 3、代码块
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    // start没有同步,execute和end进行同步
    /**
    1.start
    2.start
    1.execute.source:1
    1.end
    2.execute.source:1
    2.end
    */
    public class SynchronizedTest {
    private int source = 0;
    private void execute(long ms, String info) {
    try {
    Thread.sleep(ms);
    source++;
    System.out.println(info + ".source:" + source);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    public void method(String who) {
    System.out.println(who + ".start");
    synchronized(new Test()) { // new SynchronizedTest()、this效果一样
    execute(1000, who + ".execute");
    System.out.println(who + ".end");
    }
    }
    public static void main(String[] args) {
    SynchronizedTest sync1 = new SynchronizedTest();
    SynchronizedTest sync2 = new SynchronizedTest();
    new Thread(() -> sync1.method("1")).start();
    new Thread(() -> sync2.method("2")).start();
    }
    class Test{
    }
    }

    // 对象被修改的错误例子
    // 预期结果是线程1修改了source=1后,线程2再修改source=2,但以下加锁的对象不一样,所以每次执行结果都不一样
    /**
    1.start
    2.start
    1.execute.source:2
    2.execute.source:2
    2.end
    1.end
    */
    public class SynchronizedTest {
    private int source = 0;
    private void execute(long ms, String info) {
    try {
    Thread.sleep(ms);
    source++;
    System.out.println(info + ".source:" + source);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    public void method(String who, LockObject lock) {
    System.out.println(who + ".start");
    synchronized(lock) {
    execute(1000, who + ".execute");
    System.out.println(who + ".end");
    }
    }
    public static void main(String[] args) {
    SynchronizedTest sync1 = new SynchronizedTest();
    // LockObject lock = new LockObject();
    new Thread(() -> sync1.method("1", new LockObject())).start();
    new Thread(() -> sync1.method("2", new LockObject())).start();
    }
    }
    class LockObject{
    }

ReentrantLock

  • ReentrantLock 相比 synchronized的差别

    1、公平性(fairness):默认构造函数创建非公平同步,设置公平性后,优先由等待时间长的线程获得锁;
    2、tryLock() 方法返回尝试获得锁的结果,可以设置等待时间,减少线程阻塞;
    3、lockInterruptibly() 方法可以中断等待中的线程; FIXME
    4、可以获取等待锁的线程列表,可以获得当前锁的计数;
    5、必须有try-finally,最后需要unlock释放。

  • demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
class worker implements Runnable {
String name;
ReentrantLock re;

public worker(ReentrantLock rl, String n) {
re = rl;
name = n;
}

public void run() {
boolean done = false;
while (!done) {
boolean ans = false;
try {
ans = re.tryLock(6, TimeUnit.SECONDS);//Getting Outer Lock
} catch (InterruptedException e) {
e.printStackTrace();
}
if (ans) {
try {
log("task name - %s outer lock acquired at %s Doing outer work", name, now());
sleep(1500);
re.lock(); // Getting Inner Lock
try {
log("task name - %s inner lock acquired at %s Doing inner work", name, now());
log("Lock Hold Count - %s", re.getHoldCount());
sleep(1500);
} catch (Exception e) {
e.printStackTrace();
} finally {
log("task name - %s releasing inner lock", name);
re.unlock();//Inner lock release
}
log("Lock Hold Count - %s", re.getHoldCount());
log("task name - %s work done", name);
done = true;
} catch (Exception e) {
e.printStackTrace();
} finally {
log("task name - %s releasing outer lock", name);
re.unlock(); //Outer lock release
log("Lock Hold Count - %s", re.getHoldCount());
}
} else {
log("task name - %s waiting for lock", name);
sleep(1000);
}
}
}

public String now() {
Date d = new Date();
SimpleDateFormat ft = new SimpleDateFormat("hh:mm:ss");
return ft.format(d);
}

private void log(String text, Object... args) {
System.out.println(String.format(text, args));
}

private void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

public class Test {
static final int MAX_T = 2;

public static void main(String[] args) {
ReentrantLock rel = new ReentrantLock();
ExecutorService pool = Executors.newFixedThreadPool(MAX_T);
Runnable w1 = new worker(rel, "Job1");
Runnable w2 = new worker(rel, "Job2");
pool.execute(w1);
pool.execute(w2);
pool.shutdown();
}
}

// output
task name - Job1 outer lock acquired at 05:04:40 Doing outer work
task name - Job1 inner lock acquired at 05:04:41 Doing inner work
Lock Hold Count - 2
task name - Job1 releasing inner lock
Lock Hold Count - 1
task name - Job1 work done
task name - Job1 releasing outer lock
Lock Hold Count - 0
task name - Job2 outer lock acquired at 05:04:43 Doing outer work
task name - Job2 inner lock acquired at 05:04:44 Doing inner work
Lock Hold Count - 2
task name - Job2 releasing inner lock
Lock Hold Count - 1
task name - Job2 work done
task name - Job2 releasing outer lock
Lock Hold Count - 0

ReadWriteLock

ReadWriteLock为了解决上面两个同步机制的独占问题,特别是读多写少的场景下,读线程之间并不需要同步,ReadWriteLock只在写之间和读写之间进行同步,但设计上更复杂了,在实际应用中性能依然不怎么好
Read Access If no threads are writing, and no threads have requested write access.
Write Access If no threads are reading or writing.
ReadWriteLock的一个实现
RW Locks use Compare-And-Swap (CAS) operations to set values directly into memory as part of their thread queuing algorithm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// demo
// ReadWriteLock allows multiple concurrent readers abut only one exclusive writer.
// ReentrantReadWriteLock is an implementation of ReadWriteLock. In addition, it allows a reader/writer thread acquire a read lock/write lock multiple times recursively (reentrancy).
// Use the read lock to safeguard code that performs read operations, and use the write lock to protect access to code that performs update operation.
// In practice, ReadWriteLock can be used to increase throughput for shared data structure like cache or dictionary-like data which the update is infrequent and read is more frequent
public class ReadWriteList<E> {
private List<E> list = new ArrayList<>();
private ReadWriteLock rwLock = new ReentrantReadWriteLock();

public ReadWriteList(E... initialElements) {
list.addAll(Arrays.asList(initialElements));
}

public void add(E element) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();

try {
list.add(element);
} finally {
writeLock.unlock();
}
}

public E get(int index) {
Lock readLock = rwLock.readLock();
readLock.lock();

try {
return list.get(index);
} finally {
readLock.unlock();
}
}

public int size() {
Lock readLock = rwLock.readLock();
readLock.lock();

try {
return list.size();
} finally {
readLock.unlock();
}
}

}

StampedLock(Java8)

  • 特点

    stampedlock分析
    stampedLock vs sync… vs RWlock
    同步实现counter的几种方式
    RWLock锁的改进,由于RWLock的读写锁都是悲观锁,读和写之间是互斥的,在高并发的情况下,会导致写长期处于饥饿状态,为解决这种问题,引入StampedLock,StampedLock提供读、写和乐观读锁、锁获取时候会返回一个票据,表达对应的锁状态,0表示没有写锁访问,乐观读不会阻塞写;
    不支持重入性

  • 底层原理:CLH自旋锁

    CLH锁是一种自旋锁:锁维护一个等待线程队列,所有申请锁,但是没有成功的线程都记录在这个队列中。每一个节点(一个节点代表一个线程),保存一个标记位(locked),用于判断当前线程是否已经释放锁,当一个线程试图获得锁时,取得当前等待队列的尾部节点作为其前序节点,并使用类似如下代码判断前序节点是否已经成功释放:while(pred.locked) {}
    只要前序节点(pred)没有释放锁,则表示当前线程还不能继续执行,因此会自旋等待,反之,如果前序线程已经释放锁,则当前线程可以继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// javadoc 的例子
public class Point {
private double x, y;
private final StampedLock sl = new StampedLock();

public void move(double deltaX, double deltaY) { // an exclusively locked method
/**
* stampedLock调用writeLock和unlockWrite时候都会导致stampedLock的state(锁状态)属性值的变化
* 即每次高8位 +1,直到加到最大值,然后从0重新开始.
* 当锁被写模式所占有,没有读或者乐观的读操作能够成功。
*/
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
//释放写锁
sl.unlockWrite(stamp);
}
}

public double distanceFromOrigin() { // A read-only method
/**
* tryOptimisticRead是一个乐观的读,使用这种锁的读不阻塞写
* 每次读的时候得到一个当前的stamp值
*/
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;

/**
* validate()方法校验从调用tryOptimisticRead()之后有没有线程获得写锁,
* true:无写锁,state与stamp匹配
* false:有写锁,state与stamp不匹配,或者stamp=0(调用tryOptimisticRead()时已经被其他线程持有写锁)
*/
if (!sl.validate(stamp)) {
/**
* 被写锁入侵需要使用悲观读锁重读,阻塞写锁(防止再次出现脏数据) 或者 等待写锁释放锁
* 当然重读的时候还可以使用tryOptimisticRead,此时需要结合循环了,即类似CAS方式
*/
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
//释放读锁
sl.unlockRead(stamp);
}
}
return (currentX + currentY);
}

/**
* 初始化 x,y
* @param newX
* @param newY
*/
public void moveIfAtOrigin(double newX, double newY) {
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) {
/**
* 尝试转换成写锁
* 0:获得写锁失败
* 非0:获得写锁成功
*/
long ws = sl.tryConvertToWriteLock(stamp);
//持有写锁
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
}
//否则调用writeLock()直到获得写锁
else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
//释放锁,可以是writeLock,也可是readLock
sl.unlock(stamp);
}
}
}

wait/notify/notifyAll

这三个Object里面的方法,用于进行线程之间的协作,wait和notify都是针对对象的Monitor,而monitor都归属于对象,所以说这三个方法在Object上,由于monitor的作用是锁,所以wait和notify都必须在当前获得了锁的所有权才能执行,所以必须在同步代码块里面
wait与sleep的区别是,sleep只是让出cpu的执行权,并不释放锁,而wait会释放锁,然后等待其他线程释放锁后notify
Thread上有两个方法:yield和join, yield会暂停一下当前线程一段时间,该时间无法指定,主要用于调试,join是将多个线程并发变成同步状态;

Condition条件变量

Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods (Condition在Lock的应用里面替换wait/notify的作用)
在同步代码块中调用await后等待,等待其他线程signal,当发出signal时具体会唤起哪个等待的线程不确定
ArrayBlockingQueue原理和使用 FIXME
相比wait/notify的好处
1、提供多组等待阻塞的操作(经典的生产消费者问题)
2、提供awaitUntil(Date deadline)方法指定到期时间
3、提供awaitUninterruptibly()方法解决不希望线程被中断的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];
int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}

Semaphore (AQS)

信号量也是同步方式的一种,内部机制使用一个counter控制对共享资源的访问,信号量计数大于0时线程获取一个许可,然后计数减一,否则会阻塞等待,线程执行完后释放许可,计数加1,等待许可的线程获取执行权限;

  • 1、用于线程之间通信的信号量

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    public class SimpleSemaphore {
    private boolean signal = false;
    public synchronized void take() {
    this.signal = true;
    this.notify();
    }
    public synchronized void release() throws InterruptedException {
    while (!this.signal) wait();
    this.signal = false;
    }
    }
    //
    public class SignalDemo {
    public static void main(String[] args) {
    SimpleSemaphore semaphore = new SimpleSemaphore();
    SendingThread sender = new SendingThread(semaphore);
    ReceivingThread receiver = new ReceivingThread(semaphore);
    receiver.start();
    sender.start();
    /**
    ouput:
    release start....
    do something, then signal
    take start....
    take end....
    release start....
    receive signal, then do something...
    release start....

    receiver进入后会wait,然后等待sender处理完后notify,重新进入获取到信号,然后执行下一步
    */
    }
    }

    class SendingThread extends Thread {
    SimpleSemaphore semaphore = null;

    public SendingThread(SimpleSemaphore semaphore) {
    this.semaphore = semaphore;
    }

    public void run() {
    System.out.println("do something, then signal");
    this.semaphore.take();
    }
    }

    class ReceivingThread extends Thread {
    SimpleSemaphore semaphore = null;

    public ReceivingThread(SimpleSemaphore semaphore) {
    this.semaphore = semaphore;
    }

    public void run() {
    while (true) {
    try {
    this.semaphore.release();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("receive signal, then do something...");
    }
    }
    }
  • 2、用synchronized同步方式实现信号量加锁机制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    // bound 设定可访问上限的线程,设置为1则可以实现锁的机制
    public class BoundedSemaphore {
    private int signals = 0;
    private int bound = 0;

    public BoundedSemaphore(int upperBound){
    this.bound = upperBound;
    }

    public synchronized void take() throws InterruptedException {
    while(this.signals == bound) wait();
    this.signals++;
    this.notify();
    }

    public synchronized void release() throws InterruptedException{
    while(this.signals == 0) wait();
    this.signals--;
    this.notify();
    }
    }
  • 3、jdk5提供的Semaphore类,跟2类似机制,但是相反的逻辑,通过许可的设置来获取权限,底层使用Synchronizer实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    /** 
    * Semaphores are often used to restrict the number of threads than can
    * access some (physical or logical) resource. For example, here is
    * a class that uses a semaphore to control access to a pool of items:
    */
    class Pool {
    private static final int MAX_AVAILABLE = 100;
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

    public Object getItem() throws InterruptedException {
    available.acquire();
    return getNextAvailableItem();
    }

    public void putItem(Object x) {
    if (markAsUnused(x))
    available.release();
    }
    // Not a particularly efficient data structure; just for demo
    protected Object[] items = ... whatever kinds of items being managed
    protected boolean[] used = new boolean[MAX_AVAILABLE];
    protected synchronized Object getNextAvailableItem() {
    for (int i = 0; i < MAX_AVAILABLE; ++i) {
    if (!used[i]) {
    used[i] = true;
    return items[i];
    }
    }
    return null; // not reached
    }

    protected synchronized boolean markAsUnused(Object item) {
    for (int i = 0; i < MAX_AVAILABLE; ++i) {
    if (item == items[i]) {
    if (used[i]) {
    used[i] = false;
    return true;
    } else
    return false;
    }
    }
    return false;
    }

CountDownLatch工具 (AQS)

As per java docs, CountDownLatch is synchronisation aid that allow one or more threads to wait until set of operations being performed in other threads completes
缺点是当CountDownLatch的count一旦减少为0,没办法在进行复用,CountDownLatch 典型应用场景是应用等待一些初始化操作,例如日志、数据库连接初始化等等,需要等待一些特殊初始化操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 以下这种场景可以用join实现,但如果是使用ExecutorService则只有CountDownLatch适用
class Driver2 { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...

for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));

doneSignal.await(); // wait for all to finish
}
}

class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}

void doWork() { ... }
}}

CyclicBarrier工具

CyclicBarrier是CountDownLatch的一个变形模型,多个线程之间处理后等待都处理完,执行公共事件,然后继续处理,同时可复用counter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;

class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);

try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}

public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
Runnable barrierAction =
new Runnable() { public void run() { mergeRows(...); }};
barrier = new CyclicBarrier(N, barrierAction);

List<Thread> threads = new ArrayList<Thread>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}

// wait until done
for (Thread thread : threads)
thread.join();
}
}

CountDownLatch vs CyclicBarrier

Parameter CountDownLatch CyclicBarrier
Reuse It can not be reused once count reaches 0 It can be reinitialized once parties reaches to 0, so it can reused
Method It calls countDown() method to reduce the counter It calls await() method to reduce the counter.
Common Event It can not trigger common event when count reaches 0 It can trigger common event (Runnable) once reaches to a barrier point. Constructor:CyclicBarrier(int parties, Runnable barrierAction)
Constructor CountDownLatch(int count) CyclicBarrier(int parties)

Guava Monitor

FIXME

其他概念

锁升级(膨胀)、锁降级

内置锁底层是由monitor来实现的,Java6之前依赖操作系统的互斥锁,Java6后提供了三种monitor实现,分别是偏斜锁、轻量级锁、重量级锁,在不同的竞争环境下,jdk会对内置锁进行优化调整使用锁的类型,这种优化就是锁升级和锁降级;

偏向锁(偏斜锁)

内置锁默认在无竞争的情况下使用偏斜锁,使用CAS操作在对象头的mark word部位标记线程ID,表示该对象偏向该线程,降低无竞争情况的开销;

轻量级锁、重量级锁

当出现竞争情况时候,会撤销偏斜锁,使用CAS操作获取对象的锁,操作成功则使用了普通的轻量级锁,否则继续升级为重量级锁;

自旋锁

竞争锁失败时候不会阻塞挂起,而是空循环尝试获取锁,这种锁场景下认为持有锁的时间小于阻塞唤醒是时间,所以使用空循环更能提升性能,空循环会耗费cpu资源,所以在单核CPU下是无用的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 可重入的自旋锁实现
public class SpinLock {
private AtomicReference<Thread> owner = new AtomicReference<Thread>();
private int count = 0;
public void lock() {
Thread currentThread = Thread.currentThread();

if (currentThread == owner.get()) {
count++;
return;
}

while (!owner.compareAndSet(null, currentThread)) { // cas 空循环
}
}
public void unlock() {
Thread currentThread = Thread.currentThread();
if (owner.get() == currentThread) {
if (count != 0) {
count--;
} else {
owner.compareAndSet(currentThread, null);
}

}
}
}

虚假唤醒(Spurious Wakeup)