JDK并发包

同步工具

ReentranLock

高版本的jdk中已经对synchronized做了足够多的优化,普通的场景下它的性能已经非常接近ReentrantLock。所以一些简单的场景不必过分纠结这两者之间的性能问题。
下面这个例子反而使用synchronized的耗时小于使用ReentrantLock。

可重入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ReenterLock implements Runnable {
//可重入锁的使用
public static ReentrantLock lock = new ReentrantLock();
public static int i = 0;
@Override
public void run() {
for(int j=0; j<1000000;j++){
//加锁
lock.lock();
try{
i++;
}finally {
//释放锁
lock.unlock();
}
}
}

/* @Override
public void run() {
for(int j=0; j<1000000;j++) {
synchronized (this){
i++;
}
}
}*/

public static void main(String[] args) throws InterruptedException {
long time1 = System.currentTimeMillis();
ReenterLock tl = new ReenterLock();
Thread t1 = new Thread(tl);
Thread t2 = new Thread(tl);
t1.start();t2.start();
t1.join();t2.join();
long time2 = System.currentTimeMillis();
System.out.println(i);
System.out.println(time2-time1);
}
}

如果在一个线程中多次使用了加锁操作,就需要有对应数量的释放锁操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void run() {
for(int j=0; j<1000000;j++){
//加锁
lock.lock();
lock.lock();
try{
i++;
}finally {
//释放锁
lock.unlock();
lock.unlock();
}
}
}

下面我们加两次锁,只释放一次锁执行一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void run() {
for(int j=0; j<1000000;j++){
//加锁
lock.lock();
lock.lock();
try{
i++;
}finally {
//释放锁
lock.unlock();
}
}
}

就会发现程序卡住了,一直没有执行结束

我们通过使用jdk的两个命令来看看发生了什么

通过jps获取到这个进程的id
通过jstack看看这个进程的栈信息
可以看到Thread-1处于waiting状态一直在等在某种条件,就是等待另一个线程释放锁。
所在了第13行,也就是lock.lock();

可中断

在加锁的同时可以去响应中断,如果发生了死锁,或者意料之外的情况,在一个锁上卡了很久,还是有办法把这个线程唤醒,不至于永久性的卡死下去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ReenterLockInt implements Runnable {
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
/*
* 控制加锁顺序,方便构成死锁
* */
public ReenterLockInt(int lock){
this.lock = lock;
}

@Override
public void run() {
try{
if(lock == 1){
lock1.lockInterruptibly(); //代表可中断的加锁
try{
Thread.sleep(500);
}catch (InterruptedException e){}
lock2.lockInterruptibly();
}else{
lock2.lockInterruptibly();
try{
Thread.sleep(500);
}catch (InterruptedException e){}
lock1.lockInterruptibly();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(lock1.isHeldByCurrentThread()){
lock1.unlock();
}
if(lock2.isHeldByCurrentThread()){
lock2.unlock();
}
System.out.println(Thread.currentThread().getId()+":线程退出。");
}
}

public static void main(String[] args) throws InterruptedException {
ReenterLockInt r1 = new ReenterLockInt(1);
ReenterLockInt r2 = new ReenterLockInt(2);
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();t2.start();
Thread.sleep(1000);
}
}

上面这段代码,发生了死锁,程序无法正常结束。通过jstack可以看到发现了一个死锁。

实现是个守护线程,来触发死锁后的中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class DeadLockChecker {

private final static ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
final static Runnable deadlockCheck = new Runnable() {
@Override
public void run() {
while (true){
long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
if(deadlockedThreadIds != null){
ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadIds);
for (Thread t : Thread.getAllStackTraces().keySet()){
for(int i = 0;i<threadInfos.length; i++){
if(t.getId() == threadInfos[i].getThreadId()){
t.interrupt();
}
}
}
}
try {
Thread.sleep(5000);
}catch (InterruptedException e){
}
}
}
};

public static void check(){
Thread t = new Thread(deadlockCheck);
t.setDaemon(true); //设置为守护线程
t.start();
}
}
1
2
3
4
5
6
7
8
9
10
11

public static void main(String[] args) throws InterruptedException {
ReenterLockInt r1 = new ReenterLockInt(1);
ReenterLockInt r2 = new ReenterLockInt(2);
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();t2.start();
Thread.sleep(1000);
//开启死锁检测
DeadLockChecker.check();
}

可限时

使用lock.trylock(5,TimeUtil.SECOND)的方式获取锁。可以设置一个时间阈值,可以避免无限等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TimeLock implements Runnable{
public static ReentrantLock lock = new ReentrantLock();
@Override
public void run() {
try{
if(lock.tryLock(5, TimeUnit.SECONDS)){
System.out.println(Thread.currentThread().getId()+":get lock successed");
Thread.sleep(6000);
}else{
System.out.println(Thread.currentThread().getId()+":get lock failed");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(lock.isHeldByCurrentThread()){
lock.unlock();
}
}
}
public static void main(String[] args) {
TimeLock tl = new TimeLock();
Thread t1 = new Thread(tl);
Thread t2 = new Thread(tl);
t1.start();
t2.start();
}
}

公平锁

公平锁会保证获取锁的先到先得。公平锁不会产生饥饿问题,但是公平锁的性能要差很多,默认是非公平的。
可以通过构造方法指定为公平锁。

Condition

概念

类似于Object.wait()和Object.notify()与ReentrantLock结合使用。
Condition的执行方式,是当在线程1中调用await方法后,线程1将释放锁,并且将自己沉睡,等待唤醒,
线程2获取到锁后,开始做事,完毕后,调用Condition的signal方法,唤醒线程1,线程1恢复执行。

主要接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ReenterLockCondition implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
public static Condition condition = lock.newCondition();
@Override
public void run() {
try{
lock.lock();
condition.await();
System.out.println("Thread is going on");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ReenterLockCondition tl = new ReenterLockCondition();
Thread t1 = new Thread(tl);
t1.start();
Thread.sleep(2000);
lock.lock();
condition.signal();
lock.unlock();

}
}

API

await()方法会使当前线程等待,同时释放当前锁,当其他线程使用signal()时或者signalAll()方法时,线程会重新或得锁并继续执行。或者当线程中断时,也能跳出等待,这和Object.wait()方法类似。
awaitUninterruptibly() 和 await()基本相同,但是它并不会在等待的过程中响应中断。
signal()方法用于唤醒一个等待中的线程,相对的signalAll会唤醒所有等待中的线程,和Object.notify()类似。

Semaphore 信号量

锁是排他的,临界区只能有一个线程占用。
信号量是一种共享锁,信号量是许可若干个线程进入临界区,超过许可的上线就必须等待。
如果信号量的许可线程为1,就可以当成是排他锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SemapDemo implements Runnable{
final Semaphore semp = new Semaphore(5);
@Override
public void run() {
try{
semp.acquire();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId()+":done");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semp.release();
}
}

public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(20);
final SemapDemo semapDemo = new SemapDemo();
for (int i=0; i<20; i++){
exec.submit(semapDemo);
}
}
}

主要接口

acquire()
acquireUninterruptibly()
tryAcquire()
tryAcquire(long timeout, TimeUnit unit)
release()

ReadWriteLock

加锁之后,并行度就降为1,只有一个线程可以进入。我们需要通过操作对访问临界区的行为进行划分,当几个线程都是读取数据时,我们是不需要加锁的。

ReadWriteLock中的read线程都是无等待的并发。ReadWriteLock的实现ReentrantReadWriteLock,使用方法和ReentrantLock类似。

1
2
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }

CountDownLatch

概念

倒数计时器
一种典型的场景就是火箭发射器,在火箭发射前,为了保证万无一失,往往需要对各个设备、仪表进行检查,它是一个点火线程,等待所有检查完工后,再执行。

主要接口

static final CountDownLatch end = new CountDownLatch(10);
end.countDown();
end.await();

线程完成既定目标后执行end.countDown();
当计数器为零后 等待的主线程end.await() 继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CountDownLatchDemo implements Runnable {
static final CountDownLatch end = new CountDownLatch(10);
static final CountDownLatchDemo demo = new CountDownLatchDemo();
@Override
public void run() {
try {
//模拟检查任务
Thread.sleep(new Random().nextInt(10)*1000);
System.out.println("check complete");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
end.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newFixedThreadPool(10);
for(int i=0; i<10; i++){
exec.submit(demo);
}
//等待检查
end.await();
//发射火箭
System.out.println("fire!");
exec.shutdown();
}
}

CyclicBarrier

相当于一个循环的CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

public class CyclicBarrierDemo {
public static class Solder implements Runnable{
private String soldier;
private final CyclicBarrier cyclic;
Solder(CyclicBarrier cyclic,String soldierName){
this.cyclic = cyclic;
this.soldier = soldierName;
}
@Override
public void run() {
try{
cyclic.await();
doWork();
cyclic.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} finally {

}
}
void doWork(){
try{
Thread.sleep(Math.abs(new Random().nextInt()%10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldier+":任务完成");
}
}
public static class BarrierRun implements Runnable{
boolean flag;
int N;
public BarrierRun( boolean flag,int N){
this.flag = flag;
this.N = N;
}
@Override
public void run() {
if(flag){
System.out.println("司令:[士兵"+N+"个,任务完成!]");
}else{
System.out.println("司令:[士兵"+N+"个,集合完成!]");
flag = true;
}
}
}
public static void main(String[] args) {
final int N = 10;
Thread[] allSoldier = new Thread[N];
boolean flag = false;
CyclicBarrier cyclic = new CyclicBarrier(N,new BarrierRun(flag,N));
System.out.println("集合队伍!");
for(int i=0;i<N;i++){
System.out.println("士兵"+i+"报道!");
allSoldier[i] = new Thread(new Solder(cyclic,"士兵"+i));
allSoldier[i].start();
// if(i==5){
// allSoldier[0].interrupt();
// }
}
}
}

LockSupport

提供线程阻塞原语。

主要接口

LockSupport.park();
LockSupport.unpark();

今天最好的表现,是明天最低的要求。