多线程&并发编程知识点汇总

2022-10-16 14:29:35

引论

(思考问题)–多线程为什么快&线程数量越多越好? 多线程如何设置参考线程数量合理? 多线程安全通信有哪些锁? 高并发容器如何选择使用?线程池如何选择和设置值?

一. 多线程并发介绍

1.1线程的历史

线程的历史是一部对于CPU性能压榨的历史:CPU的执行时间远远小于数据准备时间

1.单进程人工切换-纸带机

2.多进程批处理-多个任务批量执行

3.多进程并行处理-程序写在不同的内存位置上来回切换

4.多线程-一个程序内部不同任务的来回切换(OS管理)

5.纤程/协程- 绿色线程,用户管理的(非OS管理)的线程【节省用户态到内核态切换】

1.2 计算机组成&线程并发

计算机组成:CPU工作职责就是不断取指令进行运算,做完运算之后将结果放到内存中

image-20220128140552180

1.3线程切换(OS)

T1和T2的切换过程:

1.CPU将T1的数据&指令从内存分别加载到寄存器&程序计数器,通过算术逻辑单元(ALU)进行计算,计算结果放到内存

2.通过OS调度算法来控制T1和T2的切换执行,Context上下文切换需要消耗资源

3.切换时候:T1 寄存器数据&指令数据 → 放到cache , T1 寄存器数据&指令数据 加载到CPU通过(ALU)进行执行

image-20220128140714380

1.4 理解并发

1.生活并发的现象

1.电源的发光(频率-不是一直亮)
2.电影的播放(频率-图片一直切换)
3.多线程不一定并发执行,并发执行一定是多线程

2.并发的缺点

1.线程的安全问题

2.线程的活跃问题(死锁:抢不到资源无法执行 饥饿:抢到资源的一直不走,一直无法抢到资源)

3.线程的性能问题(线程的上下文切换是需要时间开销)

3.并发的优点

1.充分利用多核CPU的计算能力,方便进行业务拆分,提升应用性能
2.另外一种总结:在速度、设计、资源利用三个方面有很大的优势

(1).速度:同时处理多个请求,响应更快;复杂的操作可以分成多个进程(或线程)同时进行
(2).设计:程序设计在某些情况下更简单,也可以有更多的选择
(3).资源利用:CPU能够等待IO的时候能够做一些其他的事情

4.减少上下文切换方式

  1. 无锁并发编程:可以参照concurrentHashMap锁分段的思想,不同的线程处理不同段的数据,这样在多线程竞争的条件下,可以减少上下文切换的时间
  2. **CAS算法:**利用Atomic下使用CAS算法来更新数据,使用了乐观锁,可以有效的减少一部分不必要的锁竞争带来的上下文切换
  3. 使用最少线程:避免创建不需要的线程,比如任务很少,但是创建了很多的线程,这样会造成大量的线程都处于等待状态
  4. 协程:在单线程里实现多任务的调度,并在单线程里维持多个任务间的切换

1.5 线程的状态转换

线程的几种状态:创建、就绪、运行、阻塞、死亡

线程的状态转换

1.6 线程的数量应该如何设置

1.单核CPU设定多线程是否有意义?答案:有意义
【1】某个线程执行操作时候需要IO输入和输出,此时处于等待状态则可以让其它线程进行执行【2】某个线程sleep的时候使用多线程

【3】CPU密集型:大量的时间在做计算 IO密集型 :大量的时间在等待IO的输入输出

2.工作线程是不是设置数量越大越好?答案:不是
【1】如果线程太多则大量时间浪费在线程的切换上面导致性能降低

3.是不是按照操作系统的核数来设置就可以?答案:不是
【1】如果操作系统还跑着其它进程是不好确定的,所以最好按照压测来确定,但是一般情况下可以设置操作系统核数作为参考

【2】操作系统的线程:分配资源的线程处于安全的考虑,对CPU需要留下些余量,不能完全使用100%,然后造成操作系统CPU崩溃一般设置80%

线程数量的计算公式

线程数计算公式

4.WC 多少时间在进行等待,多少时间在计算?答案:通过工具&测试

【1】单机情况: 通过性能分析工具进行测试JProfiler ,生产环境用阿里 Arthas

【2】分布式情况:如果是分布是调用链路,则需要更多分析工具&测试

5.多线程一定块于单线程吗??答案:不是

【1】线程间的切换是需要很大的开销的

【2】线程的上下文环境需要记录和读取

【3】.举例:垃圾收集器:serial收集器(单线程) 、pamnew(多线程)、parallel(多线程)、cms(多线程)、g1收集器(多线程)

1.7 程序&进程&线程

程序:QQ.exe作为一个文件放到内存中进行执行,QQ.exe是一个程序

进程:

【1】运行中的程序,是操作系统级别的,线程是进程基本的单位;

【2】 操作系统资源分配的最小基本单位(分配内存,IO端口号,文件描述符socket)

【3】进程中包含多个线程,线程间共享进程的资源;静态概念

线程:

【1】是处理器调度的基本单位;

【2】 同一个程序内部不同的执行路径;

【3】进程中包含多个线程,线程间共享进程的资源;动态概念

二. 高并发锁

2.1 高并发锁描述&应用场景

名称描述说明应用注意说明
ReentrantLockReentrantLock实现独占锁的功能,ReentrantLock相比Synchronized而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景``1.synchronized锁定遇到异常JVM会自动释放锁,ReentrantLock必须手动释放锁,因此经常在finally中进行锁的释放2.tryLock尝试锁定不管锁定与否方法都将继续执行,需要根据tryLock返回值来判定是否锁定。也可以指定tryLock时间,由于tryLock(time)抛出异常,所以注意unclock处理必须放到finally中3.synchronized在wait()时候必须是由notify进行唤醒,reentrantLock调用lockInterruptibly方法,可以对线程interrupt方法做出响应在一个线程等待锁的过程中,可以被打断4.synchronized:只有非公平锁 reentrantLock:可以指定公平锁5.reentrantLock可出现多个condition代表的是不同的等待队列synchronized 做不到这种效果 6.reentrantLock底层实现是CAS synchronized 默认4种状态锁的升级
CountDownLatch锁计数器典型应用场景就是启动一个服务时主线程需要等待多个组件加载完毕,之后再继续执行某一线程运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒CountDownLatch是一次性的,计算器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。
CyclicBarrier篱栅屏障点``会议通知:多少人到齐了再开会&或者多少个线程做完某个任务再执行特定的任务Countdownlatch & CyclicBarrier: Countdownlatch可以是1个线程不停countdown CyclicBarrier:是必须要有多少个线程到齐了再去做某件事情
Phaser阶段执行任务分阶段执行:不同阶段执行的任务和线程不同 整个程序分成3个阶段,某些线程执行到第一个阶段停止,某些线程执行到第二个阶段停止,某些线程执行到所有阶段完成后停止,整个执行过程中可以线程注册&解除注册 phaser.arriveAndAwaitAdvance();线程注册 phaser.arriveAndDeregister(); 解除注册 phaser.register() 注册新的线程
ReadWriteLock读写锁相比于 ReentrantLock 适用于一般场合,ReadWriteLock 适用于读多写少的情况,合理使用可以进一步提高并发效率。ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); readWriteLock.readLock(); 读锁 readWriteLock.writeLock();写锁读写锁:排它锁和共享锁 –写和写互斥,写和读互斥,只有读取和读取是同时进行的
StampedLock乐观读写锁解决写线程的饥饿问题,只有读和读不是互斥的,解决写线程获取不到CPU时间片的问题 1.读锁不会阻塞写锁,当读取数据的时候发生了写的操作,应该重新读取和获取数据。 2.采用乐观锁的机制,检测版本和Stamped是否是读取时候值,如果是则进行操作,否则重新获取数据进行操作。 3.支持读锁转换成写锁,支持写锁转换成读锁 4.long stamp = lock.writeLock();lock.unlockWrite(stamp); --写锁和读锁都具有返回值 5.读写性能远远大于ReentrantLockStampedLock提供了乐观读锁取代ReadWriteLock以进一步提升并发性能;是不可重入锁。
Semaphore信号量p限流:同一时刻只允许多少线程工作Semaphore s = new Semaphore(2, true); true:表示线程采用公平锁 false:默认是不公平锁
Exchanger数据交换``两个线程执行任务进行数据交换:Exchanger
LockSupport锁工具类(让某个线程继续运行-让某个阻塞线程继续运行)LockSupport.park(); 让当前线程阻塞 lockSupport.unpark(t); -叫醒某个指定线程 lockSupport 和 wait,notify区别 lockSupport可以叫醒某个指定线程,notify不可以 lockSupport.unpark可以在lockSupport.park()前调用可以起作用, notify不能在wait前调用,不能起作用
synchronized锁升级概念:无锁、偏向锁、自旋锁、重量级锁 偏向锁:markword 记录这个线程ID 自旋锁:如果线程争用:升级为 自旋锁(CAS操作),自旋锁是在用户态进行的,不会进入内核态,所以效率和性能都是比较高 重量级锁:10次以后,升级为重量级锁- OS(操作系统锁)执行时间长或者执行线程多的时候用OS系统锁 执行时间短并且执行线程少的用CAS(自旋锁)synchronized优化 同步代码块中的语句越少越好,用对象锁的时候加final关键字自旋锁占用CPU内存,系统锁放入等待队列 synchronized 和 reentrantLock有什么区别: 1.synchronized系统自带,不用管加锁和释放,reentrantLock需要手动加锁和释放锁 2.reentrantLock可出现多个condition, condition代表的是不同的等待队列 Synchronized 做不到这种效果 3.reentrantLock底层实现是CAS Synchronized 默认4种状态锁的升级
volatilevolatile作用: 1.保证线程可见性【通过MESI缓存一致性协议进行实现】 2.禁止指令重排序,禁止的是JVM的指令重排序,不是CPU内核的指令重排序 MESI 协议是以缓存行(CPU缓存的基本数据单位)的几个状态来命名的(Modified、Exclusive、 Share or Invalid)。该协议要求在每个缓存行上维护两个状态位,使每个数据单位处于M、E、S和I这四种状态之一,各种状态含义如下: M:被修改的。处于这一状态的数据,只在本CPU中有缓存数据,而其他CPU中没有。其状态相对于内存中的值来说,已经被修改,且没有更新到内存中。 E:独占的。处于这一状态的数据,只有在本CPU中有缓存,且其数据没有修改,即与内存中一致。 S:共享的。处于这一状态的数据在多个CPU中都有缓存,且与内存一致。 I:无效的,非法的。本CPU中的这份缓存无效。volatile 为什么保障了可见性 volatile关键字,使一个变量在多个线程间可见 A B线程都用到一个变量,java默认是A线程中保留一份copy,这样如果B线程修改了该变量,则A线程未必知道,使用volatile关键字,会让所有线程都会读到变量的修改值,将会强制所有线程都去堆内存中读取值,volatile并不能保证多个线程共同修改变量时所带来的不一致问题,也就是说volatile不能替代synchronized

2.2 CAS

CPU原语支持,中间过程不能被打断 (无锁优化-自旋 compare and set)

CAS的ABA问题:
可以用cas(version)解决、可以用cas(time)时间戳问题

如果是引用对象有问题引用传递、基础数据对象无问题值传递

2.3 AQS

1.AQS内部队列:装Node的双向链表队列容器,Node里面有thread属性(volatile Thread thread)

2.volatile int state:表示线程间是可见,state是什么意思,用实现的子类定义,state一般表示某个线程加锁和解锁重入的次数

3.队列里面的线程看state=0?,如果是则取争抢锁,如果抢不到则回到队列,—如果按照队列里面的顺序去获取锁,不抢的话就是公平锁,抢锁的方式采用CAS方式进行判断验证

4.操作队列的时候,队列里面只有第二个节点,去看head节点释放释放锁(head节点表示获取到锁的工作节点),后面的节点都在队列里面排队等着;

AQS核心:利用cas 操作双写链表, 替代了sync操作

AQS

三. 高并发容器

3.1 JAVA有4种引用:强、软、弱、虚

普通引用:强引用
特点:只要引用指向该对象,垃圾回收器绝不回收

软引用:用来描述一些还有用但并非必须的对象
应用场景:缓存使用当系统运行空间够用的时候,该对象放置在内存里面,系统空间内存不够用的时候将其自动删除掉

分配一个数组,heap将装不下,这时候系统会垃圾回收,先回收一次,如果不够,会把软引用干掉;

对于软引用关联着的对象,在系统将要发生内存溢出异常之前,将会把这些对象列进回收范围进行第二次回收,如果这次回收还没有足够的内存,才会抛出内存溢出异常。

弱引用:遭到GC就会回收
m ->弱引用 -》 new M()对象

作用:一般用在容器里面(WeakHashMap) ThreadLocal tl = new ThreadLocal<>(); tl.set(new M()); tl.remove();

t1通过强引用指向ThreadLocal ->(Thread)ThreadLocalMap → ThreadLocalMap(Entry)的key 弱引用指向ThreadLocal

为什么Entry要用弱引用:
如果是强引用,即使tl=null,但key的引用依然指向ThreadLocal对象(当前运行的线程一直没有消失),所以会内存泄漏,使用弱引用则遭到GC就会回收则避免内存泄漏

如果key的值成为NULL(即是运行线程为NULL-弱引用不存在),而导致整个value不会被回收,仍然存在内存泄漏情况 解决方案:t1.remove(); threadLocal没有使用的时候强制调用remove()

虚引用:写JVM用
PhantomReference phantomReference = new PhantomReference<>(new M(), QUEUE);

创建虚引用的时候传入2个对象:1个是指向的对象,1个事指针的队列,当虚引用被垃圾回收机制回收的时候,会调用队列里面的QUEUE.poll方法,poll方法本身是一个阻塞的方法

弱引用和虚引用的区别:1.虚引用完全无法获取指向对象的值 2.弱引用可以获取到指向对象的值

虚引用作用:netty的时候直接内存
DirectByteBuffer: 直接内存(堆外内存)不被堆管理,垃圾回收不能管理,不能被虚拟机进行回收,只有操作系统能管理;
当对象被回收时候,通过QUEUE会检测到,然后调用JAVA代码清理堆外内存;利用unsafe进行回收

3.2 ThreadLocal

1.ThreadLocal线程局部变量
2.ThreadLocal是使用空间换时间,synchronized是使用时间换空间
3.threadLocal存储的数据,虽然是1个变量,但是对于不同的线程是不可见的,所以2个线程间修改数据大家相互是不知道的,使用的是空间换时间提高效率

ThreadLocal底层如何实现:Thread作为key放入到一个map中,获取时候通过Thread作为Key直接从Map中获取
map中再嵌套1个map,存放某个线程自己的map里面 ,map在Thread.currentThread.map(ThreadLocal,person),设置到了当前线程的map中

用途:声明式事务,保障1个线程使用的是同一个collection,保障是同一连接保障事务;
将collection放入到threadlocal里面,只要同一个线程在执行调用方法的时候保障是同一个线程,才能保障多个方法在执行数据库事务的时候能够进行事务处理。

3.3 并发容器

名称描述说明应用注意说明
ListCopyOnWriteList、Vector、Stack、ArrayList、LinkedList
SetHashSet、 LinkedHashSet、 SortedSetTreeSet 、EnumSet 、 CopyOnWriteArraySet 、 ConcurrentSkipListSet跳表解决链表查询慢的问题
MapHashmap、 Treemap 、WeakHashMap、 IdentityHashMap、ConcurrentMapConcurrentMap1.多线程读取时候更安全更快 2.插入的时候经过各种判断(分段锁,链表,红黑树)、插入时候效率相对要低
非阻塞QueueArrayQueue、LinkedQueue非阻塞式的队列
阻塞QueueBlockingQueue可阻塞式的队列
ArrayBlockingQueue有界的阻塞队列
PriorityBlockingQueue优先级控制队列用户自定义优先级别策略 (排好顺序的队列)
LinkedBlockingQueue无界的阻塞队列
ConcurrentLinkedQueue采用CAS实现安全队列
DelayQueue时间上排序的阻塞队列按时间顺序任务调度(按照在队列里面的等待时间进行排序)
TransferQueue多个任务传递队列使用场景:交替任务执行 & 一个一个命名发送必须等待结果后执行下个命令线程a往队列里面装数据,装好数据后线a不能立即走,必须等线程b取数据,只有线程b将数据取走后,线程a才能继续工作;
LinkedTransferQueue链表实现多个任务传递数据
SynchronoursQueue线程间传递任务内容使用场景:线程间传递任务内容【单向的一个线程向另一个线程交换数据】 等待线程消费数据,否则处于阻塞状态比如要有人准备获取东西:strs.take()才可以往里面装数据内容:strs.put(“aaa”)

3.4 Callable和Runable区别

1.Callable,对Runnable进行了扩展
2.Callable的调用可以有返回值,Runable没有返回值;
3.配合Future使用,可以将执行的结果放入到Future中,通过异步阻塞的方式获取执行结果;

CompletableFuture: 组合任务进行执行和管理

**官网实践:三星项目数据迁移导出数据库表数据、导出静态资源OSS文件数据、**电商网站多个查询接口并行查询数据汇总
CompletableFuture.allOf(futureTM, futureTB, futureJD).join();//对一堆任务进行管理全部完成后执行

官网实践:欧莱雅项目同时判断多个验证只要有一个不符合结果则返回(并行任务验证)

CompletableFuture.anyOf(futureTM, futureTB, futureJD).join();****//一堆任务进行管理,某个任务执行完后立即执行

四. 线程池

4.1 线程池参数

线程池参数:ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4,60, TimeUnit.SECONDS,new ArrayBlockingQueue(4),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
1.核心线程数:初始化线程数量
2.最大线程数
3.生存时间–长时间不干活,清理掉核心线程活着,多余的线程进行清理
4.生存时间单位
5.线程队列–线程池可以装多少个任务,不包含线程正在执行的任务
6.线程生产的工厂-产生线程的方式
7.线程拒绝策略 Abort:抛异常 Discard:扔掉,不抛异常 DiscardOlddest:扔掉排队时间最久的 CallerRuns:调用者处理任务

创建线程或者线程池的时候,请指定有意义的名称,方便出错时候进行回溯
如:自定义线程工厂,并根据外部特征进行分组,比如机房信息定义线程组名称,再Jstack问题排查时候,非常有用和有意义

4.2 线程池使用

名称描述说明应用注意说明
SingleThreadExecutor:为什么要有单线程的线程池?1.队列任务 2.生命周期管理 3.线程池维护2个队列【线程的队列、线程任务的队列】ThreadPoolExecutor和ForkJoinPool区别? ThreadPoolExecutor共享一个任务队列 、ForkJoinPool分别有自己的任务队列
CachedPool:缓存线程池线程队列:SynchronousQueue ThreadPoolExecutor来一个任务启动一个线程
FixedThreadPool:固定线程数线程池线程队列:LinkedBlockingQueueCachedPool vs FixedThreadPool CachedPool -任务不平稳 FixedThreadPool- 任务平稳【根据业务进行精确估算和定义】
ScheduledPool定时任务线程池定时任务执行
DelayedWorkQueue等待定时任务线程池隔多少时间后进行调用定时任务假如提供一个闹钟服务订阅这个服务的人特别多10亿人如何优化: 1.服务分而治之负载均衡 2.后面服务利用线程池+队列+多线程
WorkStealingPool分叉组线程:每个线程都有自己单独的队列,空闲的队列会到忙的队列上面获取任务进行执行;优点:线程轻的可以帮助线程重的任务;每个线程都有自己单独的队列线程没有用锁;WorkStealingPool 线程池 每个线程都有自己的队列,让每个任务工作更加均匀,因为首先都到自己的队列中看是否有任务,只有当自己的任务没有才会到其它队列中拿任务,(多个队列的时候不用锁,拿自己的队列的时候不用锁,拿别人的队列要加锁)如果只有一个队列的话,会导致某个线程一直都抢到队列中的任务而工作繁忙
ForkJoinPool处理大任务,大任务切成小任务,大家处理完了然后进行合并;ForkJoinPool:分成子线程进行并行计算每个线程有自己的任务队列Concurrent vs parallel 并发是指任务提交,多个任务同时执行,可能是一个CPU 并行指任务执行-多个CPU进行同时处理 并行是并发的之集

1.需要分析线程池执行的任务的特性: CPU 密集型还是 IO 密集型

2.每个任务执行的平均时长大概是多少,这个任务的执行时长可能还跟任务处理逻辑是否涉及到网络传输以及底层系统资源依赖有关系

如果是 CPU 密集型,主要是执行计算任务,响应时间很快,cpu 一直在运行,这种任务 cpu

的利用率很高,那么线程数的配置应该根据 CPU 核心数来决定,CPU 核心数=最大同时执行

线程数,加入 CPU 核心数为 4,那么服务器最多能同时执行 4 个线程。过多的线程会导致上

下文切换反而使得效率降低。那线程池的最大线程数可以配置为 cpu 核心数+1

如果是 IO 密集型,主要是进行 IO 操作,执行 IO 操作的时间较长,这是 cpu 出于空闲状态,

导致 cpu 的利用率不高,这种情况下可以增加线程池的大小。这种情况下可以结合线程的等

待时长来做判断,等待时间越高,那么线程数也相对越多。一般可以配置 cpu 核心数的 2 倍。

一个公式:线程池设定最佳线程数目 = ((线程池设定的线程等待时间+线程 CPU 时间)/

线程 CPU 时间 )* CPU 数目

这个公式的线程 cpu 时间是预估的程序单个线程在 cpu 上运行的时间(通常使用 loadrunner

测试大量运行次数求出平均值)

4.3 线程池状态

线程池状态:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

RUNNING:接受新任务并且处理已经进入阻塞队列的任务
SHUTDOWN:不接受新任务,但是处理已经进入阻塞队列的任务
STOP:不接受新任务,不处理已经进入阻塞队列的任务并且中断正在运行的任务
TIDYING:所有的任务都已经终止,workerCount为0, 线程转化为TIDYING状态并且调用terminated钩子函数
TERMINATED:terminated钩子函数运行完成

4.4 压力测试:JMH框架

@Benchmark – 表示启动jmh性能测试
@Warmup(iterations=1, time=3)

iterations=1 预热1次 time=3 预热时间

预热,由于JVM中对于特定代码会存在优化(本地化),预热对于测试结果很重要

预热-对于jvm来说,如果反复执行相同的代码,jvm会进行优化,将这相同的代码进行本地化

@Fork(5) 执行的线程数
@BenchmarkMode(Mode.Throughput)
Throughput:吞吐量模式,
每秒可以执行多少次
执行一次要多少秒

@Measurement(iterations=1, time=3)
方法调用多少遍,在多少时间内
iterations=1 调用次数
time=3 调用的时间

4.4 线程池ThreadPoolExecutor中execute和submit方法对比

方法定义:

// ExecutorService接口中定义的方法: Future<?> submit(Runnable task); <T> Future<T> submit(Runnable task, T result); <T> Future<T> submit(Callable<T> task); // ThreadPoolExecutor中定义的方法: public void execute(Runnable command)

submit方法和execute方法都可以用来提交任务给线程池去执行,但是两者有一些区别,如下:

1、定义方法的类不同
submit是在ExecutorService接口中定义的,而execute方法是在ThreadPoolExecutor类中定义的。

2、返回值类型不同
execute方法返回值为空,submit方法会以Future的形式返回线程的执行结果。

3、对异常的处理方式不同
如果执行的任务中产生了异常,execute方法会直接打印产生的异常的堆栈,由于该异常是在子线程中产生的,主线程中包围在execute方法周围的try-catch语句并不能捕获该异常。

而submit方法提交的子线程如果产生了异常,当调用submit方法返回的Future实例的get方法时,可以在主线程中通过try-catch捕获该异常。这里需要注意的是,如果不调用Future示例的get方法,是不能捕获到异常的。

示例:

public static void main(String[] args) {
    ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5));
    Runnable runnable = new Runnable() {
        public void run() {
            System.out.println("begin run");
            Integer.parseInt("a");
            System.out.println("finish run");
        }
    };
 
    // 测试外层的try catch能否捕获execute方法抛出的异常
    try {
        System.out.println("begin execute");
        pool.execute(runnable);
        System.out.println("finish execute");
    } catch (Exception e) {
        e.printStackTrace();
        System.out.println("catch e from execute");
    }
 
    // 测试不执行submit方法返回的future的get方法时,外层的try catch能否捕获异常
    try {
        System.out.println("begin submit without get future");
        Future future = pool.submit(runnable);
        System.out.println("finish submit without get future");
    } catch (Exception e) {
        e.printStackTrace();
        System.out.println("catch e without get future");
    }
 
    // 测试执行submit方法返回的future的get方法时,外层的try catch能否捕获异常
    try {
        System.out.println("begin submit with get future");
        Future future = pool.submit(runnable);
        future.get();
        System.out.println("finish submit with get future");
    } catch (Exception e) {
        e.printStackTrace();
        System.out.println("catch e with get future");
    }
}

执行结果:

Exception in thread "pool-1-thread-1" begin execute finish execute begin submit without get future finish submit without get future begin submit with get future begin run begin run java.lang.NumberFormatException: For input string: "a" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)begin run  at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at com.nanxs.test.Test03$1.run(Test03.java:15) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) java.util.concurrent.ExecutionException: java.lang.NumberFormatException: For input string: "a" at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at com.nanxs.test.Test03.main(Test03.java:41) Caused by: java.lang.NumberFormatException: For input string: "a" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at com.nanxs.test.Test03$1.run(Test03.java:15) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) catch e with get future

结论:

execute方法执行线程,线程产生的异常会在线程池内部被消耗,execute方法外面并不能捕获到异常;
submit方法执行线程,得到一个future,如果不去尝试获取future的内容,不会有异常抛出。实际上,future的生成是瞬时, 相当于得到一个占位符,具体的操作要去调用future的API才会执行;
submit方法执行线程得到一个future,如果调用这个future的API去获取结果,例如future.get(),如果线程中有异常产生,就可以通过在submit方法周围环绕try…catch来捕获这个异常。

五. 多线程高级框架Disruptor

5.1 Disruptor的特点:环形队列、Sequence表明元素所在的位置

1.对比ConcurrentLinkedQueue : 链表实现 、 Disruptor是数组实现的

2.无锁,高并发,使用环形Buffer,直接覆盖(不用清除)旧的数据,降低GC频率

3.实现了基于事件的生产者消费者模式(观察者模式)

5.2 RingBuffer

环形队列:RingBuffer的序号,指向下一个可用的元素 采用数组实现,没有首尾指针,对比ConcurrentLinkedQueue,用数组实现的速度更快

假如长度为8,当添加到第12个元素的时候在哪个序号上呢?用12%8决定

当Buffer被填满的时候到底是覆盖还是等待,由Producer决定

长度设为2的n次幂,利于二进制计算,例如:12%8 = 12 & (8 - 1) pos = num & (size -1)

Disruptor开发步骤

1.定义Event - 队列中需要处理的元素

2.定义Event工厂,用于填充队列 【队列里面装对象已经提前知道,所可以提前分配对象内存和创建对象,后面加入对象的时候只需要进行替换【不需要进行new,不需要分配内存空间-GC产频率会降低】】

这里牵扯到效率问题:disruptor初始化,会调用Event工厂,对ringBuffer进行内存的提前分配、GC产频率会降低、定义EventHandler(消费者),处理容器中的元素

ProducerType生产者线程模式有两种模式 Producer.MULTI和Producer.SINGLE

默认是MULTI,表示在多线程模式下产生sequence如果确认是单线程生产者,那么可以指定SINGLE,效率会提升

Disruptor等待策略
1.生产者不断往里面写数据,如果每个环点都写完了,则进入等待状态,等待消费者消费数据 等待策略有8种:blockingwait策略 等待过程后,生产者由消费者进行唤醒,然后继续往环形队列里面装数据

等待策略:

1:(常用)BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,被唤醒后,再循环检查依赖的sequence是否已经消费。

2:BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu

3:LiteBlockingWaitStrategy:线程阻塞等待生产者唤醒,与BlockingWaitStrategy相比,区别在signalNeeded.getAndSet,如果两个线程同时访问一个访问waitfor,一个访问signalAll时,可以减少lock加锁次数.

4:LiteTimeoutBlockingWaitStrategy:与LiteBlockingWaitStrategy相比,设置了阻塞时间,超过时间后抛异常。

5:PhasedBackoffWaitStrategy:根据时间参数和传入的等待策略来决定使用哪种等待策略

6:TimeoutBlockingWaitStrategy:相对于BlockingWaitStrategy来说,设置了等待时间,超过后抛异常

7:(常用)YieldingWaitStrategy:尝试100次,然后Thread.yield()让出cpu

8:(常用)SleepingWaitStrategy : sleep

  • 作者:诸葛小猿
  • 原文链接:https://blog.csdn.net/wuxiaolongah/article/details/122730726
    更新时间:2022-10-16 14:29:35