Java编程原理——并发

并发基础

  1. 线程表示一条单独的执行流,它有自己的执行计数器,有自己的栈;
  2. 使用线程的方式有两种,继承Thread类以及实现Runnable接口,两种达到的效果是一样的。
  3. 线程具有下列状态:
  • NEW:没有调用start方法线程状态;
  • TERMINATED:线程运行结束后状态;
  • RUNNABLE:调用start后线程执行run方法且没有阻塞时状态;
  • BLOCKED、WAITING、TIMED_WAITING:线程被阻塞;
  1. join方法:可以让调用join方法的线程等待该线程结束;
  2. 竞态条件:当多个线程访问和操作同一个对象时,最终执行结果与执行时序有关,可能正确也可能不正确;

synchronized关键字

  1. synchronized实例方法实际保护的是同一个对象的方法调用,每次只能被一个线程持有;
  2. synchronized关键字保护的是对象,针对实例对象保护的是当前,对于静态方法保护的类对象;
  3. 被synchronized关键字保护的对象都维护着1个锁以及等待队列;
  4. synchronized具有下列特点:
  • 可重入性:对于同一个执行线程在获得锁之后,在调用其他需要同样锁的代码时,可以直接调用;
  • 内存可见性:释放锁时会将所有的更改写回内存;但是开销有些过高,可以使用volatile关键字实现相同的效果;
  • 死锁:尽量避免在持有一个锁的同时再去申请另一个锁,如果确实需要多个锁,所有代码都应该按照相同的顺序去申请锁;(使用开发过程中可以使用JDK提供的jstack检测死锁)
  1. 对于同步容器对象,单个操作是线程安全的,迭代却不是线程安全的。(因为在多线程操作中容易对容器产生结构性变化);
  2. 如果需要在迭代容器的过程中保证线程安全,需要在遍历整个容器的过程中给整个容器对象加锁;
  3. 除11中所写到的方法,推荐使用并发容器类:
  • CopyOnWriteArrayList
  • ConcurrentHashMap;
  • CocurrentLinkedQueue;
  • ConcurrentSkipSet;

线程协作机制

  1. 生产/消费者模式:生产者生产数据并放至消费队列中,消费者从消费队列中消费数据,如果队列为空消费者等待,如果队列满,生产者等待;
  2. 同时开始:在模拟仿真程序中,多个线程同时开始;
  3. 等待结束:主线程将任务拆解成多个子任务,为每个子任务创建一个线程,主线程在继续执行其他任务之前需要等待每个子任务执行完毕;
  4. 异步结果:将子线程封装为异步调用,子线程在正确返回结果前先返回一个Future对象,通过Future对象可以获得子线程最终的执行结果;
  5. Map/Reduce操作:也需要使用线程进行操作;

wait/notify机制:

  1. 每个同步对象都拥有一个锁以及一个等待队列,当调用wait方法,系统会将对象放入等待队列,直到调用notify方法再将其从等待队列中取出。
  2. 设计多线程程序的时候需要想清楚共享变量和条件,这是设计多线程协作的核心。
  3. 以生产者/消费者模式为例:生产者生产产品,并放入缓冲队列;消费者从缓冲队列取出产品进行消费。当生产者生产速度过快导致缓冲队列满了,此时生产者应停止生产。同样,当生产者生产过慢导致缓冲队列空,此时消费者应停止消费。等待生产者生产。
  4. 同时进行:一个主线程以及N个子线程,当主线程发送指令后,子线程开始同时执行;
  5. 各个线程分头行动,给咱达到一个集合点,在集合点需要集齐所有线程交换数据。然后再进行下一步动作。

取消/关闭线程机制

  1. 停止一个线程的手段主要是中断,中断线程的目的并不是强迫线程终止,而是给线程发送一个停止信号,由线程自行决定中断的时机。本质上依旧是一种协作机制。
  2. Java中关于中断的方法有如下三种:
1
2
3
public boolean isInterrupted(); // 返回对应线程的中断标志位是否为true
public void interrupt(); // 表示中断对应的线程
public static boolean interrupted(); // 返回当前线程的中断标志位是否为true,但还有一个副作用:清空中断标志位。
  1. 对于以线程提供的服务的应用程序模块而言,应该封装取消/关闭操作,提供单独的取消/关闭方法给调用者,外部调用库应该调用这些方法而不是直接调用interrupt方法;

并发包基石

原子变量

  1. 原子变量即为操作一次性完成,期间不进行线程上下文切换,并且能保证线程安全的原子变量。
  2. 原子变量有AtomicBoolean, AtomicInteger, AtomicLong以及AtomicReference(原子引用类型)类型
  3. 之所以称之为原子变量,是因为其包含一些原子方法实现组合操作的方法
  4. compareAndSet方法,也就是常说的CAS。如果当前值等于expect,则更新为update,否则不更新。如果更新成功,返回true,否则返回false。
  5. 乐观锁与悲观锁的区别:乐观锁假设每次操作都是不会出现冲突,因此在更新时不管是否出现冲突都会先进行一次更新,即便冲突了只需要再检测一次即可;而别管锁假设每次操作都会产生冲突,因此每次操作前都需要获得锁才能继续操作(例如:synchronized关键字);
  6. 使用CAS会出现一个ABA问题:假设当前值为A,如果另一个线程将A改为B然后再修改回A,当前线程的CAS操作是无法分辨当前值发生过变化的。解决ABA可以在线程修改值时加上一个时间戳,以此来判断当前线程持有变量的版本;
1
2
3
4
5
Pair pair = new Pair(100, 200);
int stamp = 1;
AtomicStampedReference<Pair> pairRef = new AtomicStampedReference<>(pair, stamp);
int newStamp = 2;
pairRef.copareAndSet(pair, new Pair(200, 200), stamp, newStamp);
  1. 总结:CAS是并发包的基础,基于它可以实现高效、乐观、非阻塞式数据结构的算法,它也是并发包中锁、同步工具以及各种容器的基础;

显式锁

  1. 包括锁Lock,主要实现类为ReentrantLock;读写锁接口ReadWriteLock,主要实现类是ReetrantReadWriteLock;
  2. 显式锁一般有下列方法:
  • lock()/unlock():普通获取锁和释放锁方法;会阻塞直至成功;
  • lockInterruptibly():可以响应中断,如果被其他线程中断会抛出InterruptedException;
  • tryLock():只是尝试获取锁,立即返回;如果获取成功返回true,否则返回false;
  • newCondition():新建一个条件
  1. 相比于synchronized,显式锁支持非阻塞方式获取锁、可以响应中断、可以限时;
  2. Lock是可重入的(一个线程持有一个锁的前提下可以继续获得该锁)、可以解决竞态条件、可以保证内存的可见性(volatile)
  3. 保证竞态条件的公平性会影响性能,一般情况下我们不作保证;例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Counter {
private final Lock lock = new ReetrantLock();
private volatile int count;

public void incr() {
lock.lock(); // 使用lock获得显式锁
try {
count++;
} finally {
lock.unlock(); // 用完要及时释放掉
}
}
public int getCount() {
return count;
}
}
  1. 使用tryLock()可以避免死锁,在持有一个锁获另一个锁而获取不到的时候,可以释放自己已持有的锁,给其他线程获得锁的机会,然后重试获取所有锁;例如下面这样写会产生死锁,使用tryLock()判断可以避免死锁问题:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// Account.java
public class Account {
private Lock lock = new ReetrantLock();
private volatile double money;

public Account(double money) {
lock.lock();
try {
this.money += money;
} finally {
lock.unlock();
}
}

public void reduce(double money) {
lock.lock();
try {
this.money -= money;
} finally {
lock.unlock();
}
}

public double getMoney() {
return money;
}

void lock() {
lock.lock();
}

void unlock() {
lock.unlock();
}

boolean tryLock() {
return lock.tryLock();
}
}

// AccountMgr.java
public class AccountMgr {
public static class NoEnoughMoneyException extends Exception{}
public static void transfer(Account from, Account to, double money)throws NoEnoughMoneyException {
from.lock(); // from获得锁
try {
to.lock(); // to获得锁
try {
if (from.getMoney() >= money) {
from.reduce(money);
to.add(money);
} else {
throw new NoEnoughMoneyException();
} finally {
to.unlock(); // to释放锁
}
} finally {
from.unlock(); // from释放锁
}
}
}
}
  1. Java中封装了一个抽象类AQS用于简化并发工具类的实现。AQS全称(AbstractQueuedSynchronizer)。AQS的实现比较复杂。AQS内部维护了一个等待队列,借助CAS方法实现了无阻塞算法进行更新。
  2. Lock锁的基本原理如下:能获得锁就立即获得,否则加入等待队列,被唤醒后检查自己是否是第一个等待的线程,如果是且能获得锁则返回;否则继续等待,这个过程如果发生了中断,lock会记录中断标志位,但不会提前返回或者是抛出异常;
  3. 保证公平性的结果是整体性能会降低,低的原因不在于检查速度慢,而是会让活跃线程得不到锁,进入等待状态,引起频繁上下文切换,降低整体的效率。需要说明的是即便fair参数为true,ReetrantLock中不带参数的tryLock方法也是不保证公平的。
  4. synchronized关键字是声明式编程,而Lock是命令式编程,所有内部的细节都需要自己实现。目前JVM对synchronized的优化已经相当的好。因此能用synchronized的地方建议使用synchronized,不能使用synchronized关键字的地方再考虑Lock;

显式条件

  1. 锁用于解决竞态条件问题,条件是线程间的协作机制。显式锁与synchronized相对应,显式条件和wait/notify相对应。
  2. 实现显式锁的显式条件为Condition类,该类为一个接口,其中包含下列方法:其中绝大部分是响应中断的。即:当出现中断会清空标志位:
1
2
3
4
5
6
7
void await() // 相当于Object中的wait();
void awaitUnInterruptibly(); // 不响应中断
long awaitNanos(long nanoTimeout);
boolean await(long times, TimeUnit unit);
boolean awaitUntil(Date deadline);
void signal(); // 相当于notify();
void signalAll(); // 相当于notifyAll();
  1. await在进入等待队列后,会释放锁以及CPU,当其他线程将它唤醒后、等待超时后以及发生中断退出后都需要重新获得锁,获取锁后才从await方法中退出。
  2. 使用显式锁的示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class WaitThread extends Thread {
private volatile boolean fire = false;
private Lock lock = new ReetrantLock();
private Condition condition = lock.newCondition();

@Override
public void run() {
try {
lock.lock();
try {
while(!fire) {
condition.await();
}
} finally {
lock.unlock();
}
System.out.println("fired");
} catch (InterruptedException e) {
Thread.interrupted();
}
}

public void fire() {
lock.lock();
try {
this.fire = true;
condition.signal();
} finally {
lock.unlock();
}
}

public static void main(String[] args) throws InterruptedException{
WaitThread waitThread = new WaitThread();
waitThread.start();
Thread.sleep(1000);
System.out.println("fire");
waitThread.fire();
}
}
  1. await()/signal()方法与显式锁配合使用;而wait()/notify()要与synchronized方法使用,不能使用混淆。否则即便编译器不报错也会出现IllegalStateException方法。

并发容器

CopyOnWriteArrayList以及CopyOnWriteArraySet

  1. 特点:线程安全,可以被多个线程并发访问;
  2. 迭代器不支持修改操作,但也不会抛出ConcurrentModificationException;
  3. 以原子方式支持一些复合操作
  4. 写时复制的技术思路为:每次出现修改操作时,都会新建一个数组,复制原数组内容到新数组,在新数组上进行需要的修改,然后一原子方式设置内部数组引用。同理:所有的读操作都是先拿到当前引用的数组,然后直接访问该数组。在读的过程中,可能内部的数组引用已经被修改了,但不会影响操作,它依旧访问原数组内容。
  5. 截至当前保证线程安全的思路有三种:
  • 使用锁:例如synchronized关键字以及ReetrantLock;
  • 使用CAS:
  • 写时复制也是一种常见的线程安全保证思路,在操作系统内部的进程管理和内存管理经常会使用到。
  1. 总结:CopyOnWriteList和CopyOnWriteSet适用于读远多于写、集合不太大的场景,它们采用了写时复制技术,这也是计算机中一种重要的思维和技术。

ConcurrentHashMap

  1. 特点:并发安全、支持一些原子复合操作、支持高并发,读完全并行、弱一致性;
  2. HashMap不是线程安全的,在高并发场景下容易出现死循环以及占满CPU的问题;
  3. Collections.synchronizedMap()在调用一些更新操作时需要加锁,而ConcurrentHashMap()就不存在这个问题;
  4. ConcurrentHashMap使用了很复杂的技术实现高并发,但是简要概括下有下列两种:
  • 分段锁:将读取的数据分为多段,每段都拥有自己的锁,每个段都相当于独立的Hash表。使用分段锁可以有效地提高并发效率。
  • 读不需要锁:对于写操作需要获取锁,不能并行,但是读操作可以并行。写的同时也可以读。
  1. 弱一致性:ConcurrentHashMap在创建完迭代器后,会按照哈希表中反映的结构迭代元素。如果结构的变化发生在已经迭代过的元素上,则结构性变化不会反映出来。反之,若结构变化发生在未发生改变的元素上,结构变化就可以被反映出来。这种现象就称之为弱一致性;

基于SkipList的Map和Set

  1. 特点:所有的操作都可以并行,包括读和写;
  2. 与TreeMap/TreeSet一样,可以实现按键的自然排序;使用方法如下:
1
2
3
4
5
6
7
8
9
public void CurrentSkipMapDemo {
public static void main(String[] args) {
Map<String, Object> map = new ConcurrentSkipMap<>(Collections.reverseOrder());
map.put("a", "abstract");
map.put("b", "basic");
map.put("c", "call");
System.out.println(map.toString());
}
}
  1. SkipMap和SkipSet是基于跳表实现,跳表是基于链表实现的。在链表的基础上增加了多层索引结构。跳表实际上一个类似于二分查找的数据结构。因此有了这样的数据结构就可以更好地实现,关于其复杂度就不再赘述。

并发队列

  1. Java中的并发队列一般包括下列几种:
  • 无锁非阻塞并发队列:ConcurrentLinkedQueue和ConcurrentLinkedDeque
  • 普通阻塞队列:基于数组的ArrayBlockingQueue,基于链表的LinkedBlockingQueue和LinkedBlockingDeque;
  • 优先级阻塞队列:PriorityBlockingQueue;
  • 延时阻塞队列:DelayQueue;
  • 其他阻塞队列:SynchronousQueue和LinkedTransferQueue;
  1. 这里的无锁指的是不使用锁,使用CAS方式实现;
  2. 普通阻塞队列适用于生产者/消费者模式;
  3. 优先级队列是按照优先级出队列的,优先级高的先出
  4. 延时阻塞队列可以用于定时任务,按照元素的延时时间出队,其特殊点在于只有当元素的延时过期之后才能从队列中被拿走
  5. SynchronousQueue没有存储空间,适用于两进程之间传递信息;LinkedTransferQueue适用于一些消息传递类型的应用中。

异步执行任务

执行接口

  • Runnable(没有返回结果)和Callable(有返回结果):表示要执行的异步任务
  • Executor和ExecutorService:表示执行服务
  • Future:表示异步任务的结果

Future接口的使用

  1. 示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class BasicDemo {
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sleepSeconds = new Random().nextInt(1000);
Thread.sleep(sleepSeconds);
return sleepSeconds;
}
}
public static void main(String[] args) throws InterruptedException{
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(new Task());
Thread.sleep(100);
try {
System.out.println(future.get());
} catch (Exception e) {
e.printStackTrace();
}
executor.shutdown();
}
}
  1. 实现原理:ExecutorService的主要实现类是ThreadPoolExecutor,是基于线程池实现的。
  2. 总结:并发包执行任务执行服务体现了并发异步开发中的“关注点分离”的思想,使用者只需要通过ExecutorService提交任务,通过Future操作任务和结果即可,不需要关注线程创建和协调的细节。

线程池

  1. 线程池主要有任务队列以及工作者线程组成;
  2. 线程池有下列优点:
  • 可以重用线程,避免线程创建的开销;
  • 任务过多时,通过排队避免创建过多线程,减少系统资源消耗和竞争;
  1. Java中最常用的线程池类是ThreadPoolExecutor;其中最重要的4个参数是:corePoolSize(核心线程个数),maximumPoolSize(最大线程个数)keepAliveTime和unit(空闲线程存活时间)
  2. ThreadPoolExecutor中的队列均为阻塞队列,分为有界和无界两种类型。需要说明的是如果使用了无界队列,线程个数最多只能达到corePoolSize,达到corePoolSize之后新的任务总会排队,参数maximumPoolSize也就没有意义了。
  3. 当队列有界并且maximumPoolSize有限时,只要队列排满,新任务总会被拒绝,因此也会出发任务拒绝策略。不过任务拒绝策略是可以自定义的。
  • ThreadPoolExecutor.AbortPolicy:默认方式,抛出异常
  • ThreadPoolExecutor.DiscardPolicy:静默处理,忽略新任务,不抛出异常也不执行
  • ThreadPoolExecutor.DiscardOldestPolicy:扔掉等待时间最长的任务,然后自己排队
  • ThreadPoolExecutor.CallerRunsPolicy:在任务提交者线程中执行任务,不交给线程池中的线程执行

定时任务

  1. TimerTask表示一个定时任务,具体的定时任务需要继承该抽象类,实现run方法。示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class BasicTimer {
static class DelayTask extends TimerTask {
@Override
public void run() {
System.out.println("delayed task");
}
}
public static void main(String[] args) throws InterruptedException {
Timer timer = new Timer();
timer.schedule(new DelayTask(), 1000);
Thread.sleep(2000);
timer.cancel();
}
}
  1. Timer内部主要是由任务队列和Timer线程两部分组成。任务队列是一个基于堆实现的优先级队列,按照下次执行的时间排优先级。一个Timer对象只有一个Timer线程。
  2. 对于固定延时任务,延时相对的是任务执行前的当前时间;对于固定频率任务,延时相对的是最优先计划;
  3. 在执行一个任务的run方法时,一旦run抛出异常,Timer线程就会退出,从而所有的定时任务都会被取消。因此,如果希望各个定时任务不互相干扰,一定要在run方法内捕获异常。
  4. 并发包中使用ScheduledExecutorService来实现定时任务,其为一个接口,多线程执行定时任务的示例如下:
1
2
3
4
5
6
7
8
9
public class ScheduledFixedDelay {
static class LongRunningTask implements Runnable {}
static class FixedDelayTask implements Runnable {}
public static void main(String[] args) {
ScheduledExecutorService timer = Executors.newScheduledThreadPool(10);
timer.schedule(new LongRunningTask(), 10, TimeUnit.MILLISECONDS);
timer.scheduleWithFixedDelay(new FixedDelayTask(), 100, 1000, TimeUnit.MILLISECONDS);
}
}
  1. ScheduledExecutorService的实现原理如下:
  • 实现背后是线程池,可以有多个线程执行任务;
  • 在任务执行后再设置下次执行的时间,对于固定延时的任务更合理;
  • 任务执行线程会捕获任务执行过程中的所有异常。一个定时任务的异常不会影响其他定时任务;

同步以及协作工具类

  1. 同步工具类包括:
  • ReetrantReadWriteLock:读写显式锁
  • Semaphore:信号量
  • CountDownLatch:倒计时门栓
  • CyclicBarrier:循环栅栏
  • ThreadLocal:线程本地 2.ReetrantReadWriteLock读写显式锁:只有“读-读”可以并行,“读-写”和“写-写”都是不可以的。只有一个线程可以进行写操作。是基于CAS实现的;
  1. Semaphore:限制并发数量,是基于AQS实现的;
  2. CountDownLatch:默认是关闭的,所有希望通过该门的线程都需要等待,然后开始倒计时,倒计时变为0后,门栓打开,等待的所有线程都可以通过。门栓是一次性的,打开了就不能再关上了。
  3. CyclicBarrier:所有线程在到达该栅栏后都需要等待其他线程,等待所有线程都到达后在一起通过。它是循环的,可以用重复的同步。
  4. ThreadLocal:每个线程都有同一个变量的独有拷贝,也就是说每个线程都拥有一个属于自己的独立值。一般来说ThreadLocal都定义为静态类型便于引用

并发总结