Java编程原理——并发
并发基础
- 线程表示一条单独的执行流,它有自己的执行计数器,有自己的栈;
- 使用线程的方式有两种,继承Thread类以及实现Runnable接口,两种达到的效果是一样的。
- 线程具有下列状态:
- NEW:没有调用start方法线程状态;
- TERMINATED:线程运行结束后状态;
- RUNNABLE:调用start后线程执行run方法且没有阻塞时状态;
- BLOCKED、WAITING、TIMED_WAITING:线程被阻塞;
- join方法:可以让调用join方法的线程等待该线程结束;
- 竞态条件:当多个线程访问和操作同一个对象时,最终执行结果与执行时序有关,可能正确也可能不正确;
synchronized关键字
- synchronized实例方法实际保护的是同一个对象的方法调用,每次只能被一个线程持有;
- synchronized关键字保护的是对象,针对实例对象保护的是当前,对于静态方法保护的类对象;
- 被synchronized关键字保护的对象都维护着1个锁以及等待队列;
- synchronized具有下列特点:
- 可重入性:对于同一个执行线程在获得锁之后,在调用其他需要同样锁的代码时,可以直接调用;
- 内存可见性:释放锁时会将所有的更改写回内存;但是开销有些过高,可以使用volatile关键字实现相同的效果;
- 死锁:尽量避免在持有一个锁的同时再去申请另一个锁,如果确实需要多个锁,所有代码都应该按照相同的顺序去申请锁;(使用开发过程中可以使用JDK提供的jstack检测死锁)
- 对于同步容器对象,单个操作是线程安全的,迭代却不是线程安全的。(因为在多线程操作中容易对容器产生结构性变化);
- 如果需要在迭代容器的过程中保证线程安全,需要在遍历整个容器的过程中给整个容器对象加锁;
- 除11中所写到的方法,推荐使用并发容器类:
- CopyOnWriteArrayList
- ConcurrentHashMap;
- CocurrentLinkedQueue;
- ConcurrentSkipSet;
线程协作机制
- 生产/消费者模式:生产者生产数据并放至消费队列中,消费者从消费队列中消费数据,如果队列为空消费者等待,如果队列满,生产者等待;
- 同时开始:在模拟仿真程序中,多个线程同时开始;
- 等待结束:主线程将任务拆解成多个子任务,为每个子任务创建一个线程,主线程在继续执行其他任务之前需要等待每个子任务执行完毕;
- 异步结果:将子线程封装为异步调用,子线程在正确返回结果前先返回一个Future对象,通过Future对象可以获得子线程最终的执行结果;
- Map/Reduce操作:也需要使用线程进行操作;
wait/notify机制:
- 每个同步对象都拥有一个锁以及一个等待队列,当调用wait方法,系统会将对象放入等待队列,直到调用notify方法再将其从等待队列中取出。
- 设计多线程程序的时候需要想清楚共享变量和条件,这是设计多线程协作的核心。
- 以生产者/消费者模式为例:生产者生产产品,并放入缓冲队列;消费者从缓冲队列取出产品进行消费。当生产者生产速度过快导致缓冲队列满了,此时生产者应停止生产。同样,当生产者生产过慢导致缓冲队列空,此时消费者应停止消费。等待生产者生产。
- 同时进行:一个主线程以及N个子线程,当主线程发送指令后,子线程开始同时执行;
- 各个线程分头行动,给咱达到一个集合点,在集合点需要集齐所有线程交换数据。然后再进行下一步动作。
取消/关闭线程机制
- 停止一个线程的手段主要是中断,中断线程的目的并不是强迫线程终止,而是给线程发送一个停止信号,由线程自行决定中断的时机。本质上依旧是一种协作机制。
- Java中关于中断的方法有如下三种:
| public boolean isInterrupted(); public void interrupt(); public static boolean interrupted();
|
- 对于以线程提供的服务的应用程序模块而言,应该封装取消/关闭操作,提供单独的取消/关闭方法给调用者,外部调用库应该调用这些方法而不是直接调用interrupt方法;
并发包基石
原子变量
- 原子变量即为操作一次性完成,期间不进行线程上下文切换,并且能保证线程安全的原子变量。
- 原子变量有AtomicBoolean, AtomicInteger, AtomicLong以及AtomicReference(原子引用类型)类型
- 之所以称之为原子变量,是因为其包含一些原子方法实现组合操作的方法
- compareAndSet方法,也就是常说的CAS。如果当前值等于expect,则更新为update,否则不更新。如果更新成功,返回true,否则返回false。
- 乐观锁与悲观锁的区别:乐观锁假设每次操作都是不会出现冲突,因此在更新时不管是否出现冲突都会先进行一次更新,即便冲突了只需要再检测一次即可;而别管锁假设每次操作都会产生冲突,因此每次操作前都需要获得锁才能继续操作(例如:synchronized关键字);
- 使用CAS会出现一个ABA问题:假设当前值为A,如果另一个线程将A改为B然后再修改回A,当前线程的CAS操作是无法分辨当前值发生过变化的。解决ABA可以在线程修改值时加上一个时间戳,以此来判断当前线程持有变量的版本;
1 2 3 4 5
| Pair pair = new Pair(100, 200); int stamp = 1; AtomicStampedReference<Pair> pairRef = new AtomicStampedReference<>(pair, stamp); int newStamp = 2; pairRef.copareAndSet(pair, new Pair(200, 200), stamp, newStamp);
|
- 总结:CAS是并发包的基础,基于它可以实现高效、乐观、非阻塞式数据结构的算法,它也是并发包中锁、同步工具以及各种容器的基础;
显式锁
- 包括锁Lock,主要实现类为ReentrantLock;读写锁接口ReadWriteLock,主要实现类是ReetrantReadWriteLock;
- 显式锁一般有下列方法:
- lock()/unlock():普通获取锁和释放锁方法;会阻塞直至成功;
- lockInterruptibly():可以响应中断,如果被其他线程中断会抛出InterruptedException;
- tryLock():只是尝试获取锁,立即返回;如果获取成功返回true,否则返回false;
- newCondition():新建一个条件
- 相比于synchronized,显式锁支持非阻塞方式获取锁、可以响应中断、可以限时;
- Lock是可重入的(一个线程持有一个锁的前提下可以继续获得该锁)、可以解决竞态条件、可以保证内存的可见性(volatile)
- 保证竞态条件的公平性会影响性能,一般情况下我们不作保证;例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class Counter { private final Lock lock = new ReetrantLock(); private volatile int count; public void incr() { lock.lock(); try { count++; } finally { lock.unlock(); } } public int getCount() { return count; } }
|
- 使用tryLock()可以避免死锁,在持有一个锁获另一个锁而获取不到的时候,可以释放自己已持有的锁,给其他线程获得锁的机会,然后重试获取所有锁;例如下面这样写会产生死锁,使用tryLock()判断可以避免死锁问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| public class Account { private Lock lock = new ReetrantLock(); private volatile double money; public Account(double money) { lock.lock(); try { this.money += money; } finally { lock.unlock(); } } public void reduce(double money) { lock.lock(); try { this.money -= money; } finally { lock.unlock(); } } public double getMoney() { return money; } void lock() { lock.lock(); } void unlock() { lock.unlock(); } boolean tryLock() { return lock.tryLock(); } }
public class AccountMgr { public static class NoEnoughMoneyException extends Exception{} public static void transfer(Account from, Account to, double money)throws NoEnoughMoneyException { from.lock(); try { to.lock(); try { if (from.getMoney() >= money) { from.reduce(money); to.add(money); } else { throw new NoEnoughMoneyException(); } finally { to.unlock(); } } finally { from.unlock(); } } } }
|
- Java中封装了一个抽象类AQS用于简化并发工具类的实现。AQS全称(AbstractQueuedSynchronizer)。AQS的实现比较复杂。AQS内部维护了一个等待队列,借助CAS方法实现了无阻塞算法进行更新。
- Lock锁的基本原理如下:能获得锁就立即获得,否则加入等待队列,被唤醒后检查自己是否是第一个等待的线程,如果是且能获得锁则返回;否则继续等待,这个过程如果发生了中断,lock会记录中断标志位,但不会提前返回或者是抛出异常;
- 保证公平性的结果是整体性能会降低,低的原因不在于检查速度慢,而是会让活跃线程得不到锁,进入等待状态,引起频繁上下文切换,降低整体的效率。需要说明的是即便fair参数为true,ReetrantLock中不带参数的tryLock方法也是不保证公平的。
- synchronized关键字是声明式编程,而Lock是命令式编程,所有内部的细节都需要自己实现。目前JVM对synchronized的优化已经相当的好。因此能用synchronized的地方建议使用synchronized,不能使用synchronized关键字的地方再考虑Lock;
显式条件
- 锁用于解决竞态条件问题,条件是线程间的协作机制。显式锁与synchronized相对应,显式条件和wait/notify相对应。
- 实现显式锁的显式条件为Condition类,该类为一个接口,其中包含下列方法:其中绝大部分是响应中断的。即:当出现中断会清空标志位:
1 2 3 4 5 6 7
| void await() void awaitUnInterruptibly(); long awaitNanos(long nanoTimeout); boolean await(long times, TimeUnit unit); boolean awaitUntil(Date deadline); void signal(); void signalAll();
|
- await在进入等待队列后,会释放锁以及CPU,当其他线程将它唤醒后、等待超时后以及发生中断退出后都需要重新获得锁,获取锁后才从await方法中退出。
- 使用显式锁的示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public class WaitThread extends Thread { private volatile boolean fire = false; private Lock lock = new ReetrantLock(); private Condition condition = lock.newCondition(); @Override public void run() { try { lock.lock(); try { while(!fire) { condition.await(); } } finally { lock.unlock(); } System.out.println("fired"); } catch (InterruptedException e) { Thread.interrupted(); } } public void fire() { lock.lock(); try { this.fire = true; condition.signal(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException{ WaitThread waitThread = new WaitThread(); waitThread.start(); Thread.sleep(1000); System.out.println("fire"); waitThread.fire(); } }
|
- await()/signal()方法与显式锁配合使用;而wait()/notify()要与synchronized方法使用,不能使用混淆。否则即便编译器不报错也会出现IllegalStateException方法。
并发容器
CopyOnWriteArrayList以及CopyOnWriteArraySet
- 特点:线程安全,可以被多个线程并发访问;
- 迭代器不支持修改操作,但也不会抛出ConcurrentModificationException;
- 以原子方式支持一些复合操作
- 写时复制的技术思路为:每次出现修改操作时,都会新建一个数组,复制原数组内容到新数组,在新数组上进行需要的修改,然后一原子方式设置内部数组引用。同理:所有的读操作都是先拿到当前引用的数组,然后直接访问该数组。在读的过程中,可能内部的数组引用已经被修改了,但不会影响操作,它依旧访问原数组内容。
- 截至当前保证线程安全的思路有三种:
- 使用锁:例如synchronized关键字以及ReetrantLock;
- 使用CAS:
- 写时复制也是一种常见的线程安全保证思路,在操作系统内部的进程管理和内存管理经常会使用到。
- 总结:CopyOnWriteList和CopyOnWriteSet适用于读远多于写、集合不太大的场景,它们采用了写时复制技术,这也是计算机中一种重要的思维和技术。
ConcurrentHashMap
- 特点:并发安全、支持一些原子复合操作、支持高并发,读完全并行、弱一致性;
- HashMap不是线程安全的,在高并发场景下容易出现死循环以及占满CPU的问题;
- Collections.synchronizedMap()在调用一些更新操作时需要加锁,而ConcurrentHashMap()就不存在这个问题;
- ConcurrentHashMap使用了很复杂的技术实现高并发,但是简要概括下有下列两种:
- 分段锁:将读取的数据分为多段,每段都拥有自己的锁,每个段都相当于独立的Hash表。使用分段锁可以有效地提高并发效率。
- 读不需要锁:对于写操作需要获取锁,不能并行,但是读操作可以并行。写的同时也可以读。
- 弱一致性:ConcurrentHashMap在创建完迭代器后,会按照哈希表中反映的结构迭代元素。如果结构的变化发生在已经迭代过的元素上,则结构性变化不会反映出来。反之,若结构变化发生在未发生改变的元素上,结构变化就可以被反映出来。这种现象就称之为弱一致性;
基于SkipList的Map和Set
- 特点:所有的操作都可以并行,包括读和写;
- 与TreeMap/TreeSet一样,可以实现按键的自然排序;使用方法如下:
1 2 3 4 5 6 7 8 9
| public void CurrentSkipMapDemo { public static void main(String[] args) { Map<String, Object> map = new ConcurrentSkipMap<>(Collections.reverseOrder()); map.put("a", "abstract"); map.put("b", "basic"); map.put("c", "call"); System.out.println(map.toString()); } }
|
- SkipMap和SkipSet是基于跳表实现,跳表是基于链表实现的。在链表的基础上增加了多层索引结构。跳表实际上一个类似于二分查找的数据结构。因此有了这样的数据结构就可以更好地实现,关于其复杂度就不再赘述。
并发队列
- Java中的并发队列一般包括下列几种:
- 无锁非阻塞并发队列:ConcurrentLinkedQueue和ConcurrentLinkedDeque
- 普通阻塞队列:基于数组的ArrayBlockingQueue,基于链表的LinkedBlockingQueue和LinkedBlockingDeque;
- 优先级阻塞队列:PriorityBlockingQueue;
- 延时阻塞队列:DelayQueue;
- 其他阻塞队列:SynchronousQueue和LinkedTransferQueue;
- 这里的无锁指的是不使用锁,使用CAS方式实现;
- 普通阻塞队列适用于生产者/消费者模式;
- 优先级队列是按照优先级出队列的,优先级高的先出
- 延时阻塞队列可以用于定时任务,按照元素的延时时间出队,其特殊点在于只有当元素的延时过期之后才能从队列中被拿走
- SynchronousQueue没有存储空间,适用于两进程之间传递信息;LinkedTransferQueue适用于一些消息传递类型的应用中。
异步执行任务
执行接口
- Runnable(没有返回结果)和Callable(有返回结果):表示要执行的异步任务
- Executor和ExecutorService:表示执行服务
- Future:表示异步任务的结果
Future接口的使用
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class BasicDemo { static class Task implements Callable<Integer> { @Override public Integer call() throws Exception { int sleepSeconds = new Random().nextInt(1000); Thread.sleep(sleepSeconds); return sleepSeconds; } } public static void main(String[] args) throws InterruptedException{ ExecutorService executor = Executors.newSingleThreadExecutor(); Future<Integer> future = executor.submit(new Task()); Thread.sleep(100); try { System.out.println(future.get()); } catch (Exception e) { e.printStackTrace(); } executor.shutdown(); } }
|
- 实现原理:ExecutorService的主要实现类是ThreadPoolExecutor,是基于线程池实现的。
- 总结:并发包执行任务执行服务体现了并发异步开发中的“关注点分离”的思想,使用者只需要通过ExecutorService提交任务,通过Future操作任务和结果即可,不需要关注线程创建和协调的细节。
线程池
- 线程池主要有任务队列以及工作者线程组成;
- 线程池有下列优点:
- 可以重用线程,避免线程创建的开销;
- 任务过多时,通过排队避免创建过多线程,减少系统资源消耗和竞争;
- Java中最常用的线程池类是ThreadPoolExecutor;其中最重要的4个参数是:corePoolSize(核心线程个数),maximumPoolSize(最大线程个数)keepAliveTime和unit(空闲线程存活时间)
- ThreadPoolExecutor中的队列均为阻塞队列,分为有界和无界两种类型。需要说明的是如果使用了无界队列,线程个数最多只能达到corePoolSize,达到corePoolSize之后新的任务总会排队,参数maximumPoolSize也就没有意义了。
- 当队列有界并且maximumPoolSize有限时,只要队列排满,新任务总会被拒绝,因此也会出发任务拒绝策略。不过任务拒绝策略是可以自定义的。
- ThreadPoolExecutor.AbortPolicy:默认方式,抛出异常
- ThreadPoolExecutor.DiscardPolicy:静默处理,忽略新任务,不抛出异常也不执行
- ThreadPoolExecutor.DiscardOldestPolicy:扔掉等待时间最长的任务,然后自己排队
- ThreadPoolExecutor.CallerRunsPolicy:在任务提交者线程中执行任务,不交给线程池中的线程执行
定时任务
- TimerTask表示一个定时任务,具体的定时任务需要继承该抽象类,实现run方法。示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class BasicTimer { static class DelayTask extends TimerTask { @Override public void run() { System.out.println("delayed task"); } } public static void main(String[] args) throws InterruptedException { Timer timer = new Timer(); timer.schedule(new DelayTask(), 1000); Thread.sleep(2000); timer.cancel(); } }
|
- Timer内部主要是由任务队列和Timer线程两部分组成。任务队列是一个基于堆实现的优先级队列,按照下次执行的时间排优先级。一个Timer对象只有一个Timer线程。
- 对于固定延时任务,延时相对的是任务执行前的当前时间;对于固定频率任务,延时相对的是最优先计划;
- 在执行一个任务的run方法时,一旦run抛出异常,Timer线程就会退出,从而所有的定时任务都会被取消。因此,如果希望各个定时任务不互相干扰,一定要在run方法内捕获异常。
- 并发包中使用ScheduledExecutorService来实现定时任务,其为一个接口,多线程执行定时任务的示例如下:
1 2 3 4 5 6 7 8 9
| public class ScheduledFixedDelay { static class LongRunningTask implements Runnable {} static class FixedDelayTask implements Runnable {} public static void main(String[] args) { ScheduledExecutorService timer = Executors.newScheduledThreadPool(10); timer.schedule(new LongRunningTask(), 10, TimeUnit.MILLISECONDS); timer.scheduleWithFixedDelay(new FixedDelayTask(), 100, 1000, TimeUnit.MILLISECONDS); } }
|
- ScheduledExecutorService的实现原理如下:
- 实现背后是线程池,可以有多个线程执行任务;
- 在任务执行后再设置下次执行的时间,对于固定延时的任务更合理;
- 任务执行线程会捕获任务执行过程中的所有异常。一个定时任务的异常不会影响其他定时任务;
同步以及协作工具类
- 同步工具类包括:
- ReetrantReadWriteLock:读写显式锁
- Semaphore:信号量
- CountDownLatch:倒计时门栓
- CyclicBarrier:循环栅栏
- ThreadLocal:线程本地 2.ReetrantReadWriteLock读写显式锁:只有“读-读”可以并行,“读-写”和“写-写”都是不可以的。只有一个线程可以进行写操作。是基于CAS实现的;
- Semaphore:限制并发数量,是基于AQS实现的;
- CountDownLatch:默认是关闭的,所有希望通过该门的线程都需要等待,然后开始倒计时,倒计时变为0后,门栓打开,等待的所有线程都可以通过。门栓是一次性的,打开了就不能再关上了。
- CyclicBarrier:所有线程在到达该栅栏后都需要等待其他线程,等待所有线程都到达后在一起通过。它是循环的,可以用重复的同步。
- ThreadLocal:每个线程都有同一个变量的独有拷贝,也就是说每个线程都拥有一个属于自己的独立值。一般来说ThreadLocal都定义为静态类型便于引用
并发总结