当前位置: 首页 > news >正文

Java——线程池使用

线程池使用

    • 1、线程池的使用模型
      • 1.1、单线程提交—多线程处理
      • 1.2、单线程提交—单线程处理
      • 1.3、多线程提交—单线程处理
      • 1.4、多线程提交—多线程处理
    • 2、本地缓存实现
      • 2.1、设计原理
      • 2.2、实现方案
    • 3、多线程异步执行
      • 3.1、设计原理
      • 3.2、实现方案
      • 3.3、改进方案
    • 4、批量处理任务的执行
      • 4.1、设计原理
      • 4.2、实现方案
    • 5、并发排队队列
      • 5.1、设计原理
      • 5.2、实现方案

1、线程池的使用模型

1.1、单线程提交—多线程处理

通常一个业务处理涉及多个步骤,如果各步骤之间没有直接的关系,那么可以把这些步骤拆分成独立子任务提交到线程池中执行。例如在电商系统设计中,用户要查询商品的详细信息,后端需要将商品的价格、优惠、库存、图片等信息聚合起来一起返回给前端,给用户展现整个商品的详情。串行执行的流程如图所示。

获取价格、获取库存、获取描述之间是完全相互独立的。获取商品详情的时长为T=T1+T2+T3。如果每个步骤都比较耗时的话,获取商品详情会非常慢,会严重影响客户的体验。但如果按照单线程提交多线程处理的模式,可以把获取价格、获取库存、获取描述变成三个独立的子任务提交到线程池中去执行,如图所示。

整个运行的时长就会变成三者的最大时长——T=MAX(T1,T2,T3),这样会极大地缩短获取商品详情的时间,提高接口的响应性能。

在业务系统中,通常会有一些批处理任务。这类任务通常是没有人工参与的,每次要处理的数据量特别大,执行的时间比较长,需要的系统资源比较多,最终会以报告或报表的形式体现任务的执行结果。例如在电商平台中,每天晚上都需要根据门店的订单、支付、营销等情况计算出门店的营业报表。如果平台上有几百万家门店,平台每天晚上都需要执行几百万个任务。按照一家门店一家门店地处理,任务执行的时间会非常长,估计第二天营业了头一天的报表还没有出来。在这种场景中,我们首先可以通过单线程从数据库中获取门店信息,然后按照门店的维度来封装任务,最后将任务提交到线程池中去执行,处理流程如图所示。

通过线程池来处理批次任务,每个线程执行一个门店的营业报表的任务,能够极大地提高报表生成效率,提高整个系统的吞吐量。

网络编程中也可以采用单线程提交—多线程处理的模式来提升效率。例如,服务端给手机App推送一个新的未读信息。在早期的网络编程模型中,系统会在一个线程中完成网络连接监听、数据解析、业务逻辑处理、结果返回等业务逻辑。整个模型采用的是单线程,系统处理完一个客户端的请求之后才能处理另一个客户端的请求,系统的执行效率非常低,如图所示。

我们可以通过线程池提高网络请求的处理效率。如图所示,系统首先启动一个线程来监听服务器的网络端口。当客户端连接时,监听线程就会收到Socket请求。收到网络的请求后,监听线程将Socket请求封装成任务提交到线程池执行。线程池中的工作线程会读取Socket中的数据,然后进行业务处理,并将业务处理结果返回给客户端。每个工作线程负责处理一个客户端Socket请求。线程池可以并行地处理多个客户端的网络请求,极大地提升了服务端的处理效率。

1.2、单线程提交—单线程处理

单线程提交—单线程处理一般用于计划任务的场景。在业务系统中经常会有一些程序或者脚本需要在指定的时间执行,这种调用方式一般就是计划任务。计划任务分为两种:一种是一次性任务,它在指定时间执行;另一种是周期性任务。可以将一次性任务与周期性任务放到线程池中执行,如图所示。

1.3、多线程提交—单线程处理

有些业务场景非常复杂,执行的时间非常长,并且多个任务的执行需要依赖同一个资源。如果多个线程同时进行业务处理会造成资源的冲突,导致所有任务都无法执行。早期的业务系统会调用存储过程,存储过程会涉及多张表的锁,如果多线程同时执行存储过程会造成资源冲突,导致数据库死锁,如图所示。


在这种场景中,业务线程可以先将要处理的任务放入任务队列,通过任务队列来缓冲要执行的任务。任务队列的模式可以采用FIFO,以保证任务执行的先后顺序。线程池会启动一个线程从任务队列中获取任务,然后执行任务,这样就可以通过单线程池来避免依赖同一个资源而造成的冲突,如图所示。

通过多线程提交—单线程处理的模式来避免资源冲突,从而确保业务的正确性。

1.4、多线程提交—多线程处理

多个业务线程同时向线程池提交任务,线程池中多个线程同时执行任务,通过线程池来提高CPU的使用效率与系统的吞吐量。多线程提交—多线程处理是线程池通用的使用模型,如图所示。

Tomcat就是采用多线程来监听用户发起的HTTP请求,然后将请求提交到线程池中,线程池会启动多个线程来响应用户的请求。

2、本地缓存实现

在日常开发中有很多这样的场景:有一些业务系统的配置信息,数据量不大,修改的频率不高,但是访问非常频繁。如果每次程序都从数据库或集中式缓存中获取,受限于硬盘I/O的性能、远程网络访问等,程序执行的效率不高。在这样的业务场景中,我们可以通过本地缓存来提升数据访问的效率。本节基于ConcurrentHashMap与ScheduledThread-PoolExecutor来实现一个线程安全的本地缓存:LocalCache。LocalCache支持永久缓存与临时缓存,永久缓存的数据一直有效,临时缓存的数据在指定时间到期之后会自动从缓存中移除。LocalCache提供了数据安全的增、删、查、改功能,具体方法如表所示。

2.1、设计原理

LocalCache主要由3个部分组成:数据缓存、数据超时时间、数据清除任务。数据缓存和数据超时时间都采用ConcurrentHashMap来存储数据,key为数据存储的值,value是数据的时间戳。数据清除任务采用ScheduledThreadPoolExecutor进行任务调度,默认的任务线程数为1,这样可以避免多线程带来的并发修改问题,同时线程都是内存操作,这样单线程同样具备高性能。本地缓存设计如图所示。

每次向缓存中插入数据时,LocalCache首先会将数据插入到ConcurrentHashMap中。然后判断有没有设置超时时间,如有超时时间,LocalCache会将失效时间插入到Concurrent-HashMap中,并创建数据清除任务,之后将任务提交到ScheduledThreadPoolExecutor线程池中。

每次从缓存中查询数据,LocalCache会直接从ConcurrentHashMap中读取数据。

定时任务线程池会按照超时时间来触发数据清除任务,数据清除任务会从数据时长的缓存池中获取key对应的时间,判断当前key对应的数据是否已经到期了。如果数据已经到期了,LocalCache会调用remove方法将数据从缓存池中移除。

2.2、实现方案

LocalCache作为本地缓存的接口,定义了数据插入、数据删除、数据查询的相关接口方法。DefaultLocalCache是本地缓存的实现类,它实现了LocalCache接口的所有方法。Default-LocalCache定义了两个ConcurrentHashMap的变量:map与timeOutMap。map用来缓存数据信息,timeOutMap用来存储数据失效的时间戳。同时,DefaultLocalCache还定义了数据清除任务ClearTask,ClearTask负责将过期的数据从map中移除。本地缓存的UML图如图所示。

LocalCache定义了两个数据插入的put接口:一个没有到期时间,另一个有到期时间。没有到期时间表示数据永久有效,有到期时间的数据会在到期后从缓存中移除。如下代码是LocalCache接口的代码,该接口提供了两个查询方法:一个是isContainKey,用于判断key在缓存中是否存在;另一个是get,用于直接获取缓存数据。LocalCache提供了remove方法来删除缓存中的数据。

DefaultLocalCache内部定义了3个常量:缓存的默认大小DEFAULT_THREAD_SIZE、最大容量MAX_CAPACITY、定时线程池的大小DEFAULT_THREAD_SIZE。DefaultLocalCache实现如代码所示。



ConcurrentHashMap与ScheduledThreadPoolExecutor的结合使用,使得LocalCache支持永久缓存与临时缓存两种能力。

3、多线程异步执行

下面以电商业务的会员系统为例讲解如何以线程异步执行的方式来提高接口的响应度。会员系统一般会存储会员的资产信息,常见的资产信息有储值、优惠券、会员权益等。

储值、优惠券、会员权益都是采用独立的数据库表结构来存储的。经常需要在一个接口查询会员的所有资产信息,这时需要查询多张表然后进行资产信息的组装,整个链路比较长。

3.1、设计原理

我们可以将储值、优惠券、会员权益的查询拆分成独立的子任务,提交到线程池中进行并行执行,如图所示。之后业务线程可异步获取每个任务的执行结果。

3.2、实现方案

线程池ThreadPoolExecutor提供了多个线程资源来执行线程任务。根据接口支持最小10QPS和最大300QPS来设置线程池的大小与队列长度。

用户资产异步查询的UML图,如图所示。CustomerBenefitsTask获取用户权益,CustomerBalanceTask获取用户余额,CustomerCouponTask获取用户优惠券。CustomerInfo-Service是会员资产的接口服务,定义了两个方法来获取会员的资产信息:一个方法是同步的,另一个方法是异步的。CustomerInfoServiceImpl是会员资产的具体实现类,内部定义了线程池executor来执行具体的线程任务,实现了CustomerInfoServiceImpl接口的功能。


1. CustomerBenefitsTask
CustomerBenefitsTask的代码比较简单​,内部模拟了查询会员权益的方法。


2. CustomerBalanceTask
CustomerBalanceTask实现了Runnable接口,内部模拟了查询账户余额的功能,如代码所示。

3. CustomerCouponTask
CustomerCouponTask实现了Runnable接口,内部模拟了查询优惠券的功能,如代码所示。

4. CustomerInfo
CustomerInfo是会员信息对象,内部定义了会员ID、优惠券列表couponList、储值余额account、会员权益benefitsList等信息。CustomerInfo实现如代码所示。

5. CustomerInfoService
CustomerInfoService是会员资产查询信息的接口,内部定义了同步查询与异步查询会员资产的方法。代码是CustomerInfoService接口的实现。


CustomerInfoServiceImpl内部定义了核心线程数coreSize、最大线程数maxSize、队列长度queueSize、任务队列taskQueue、线程池executor等变量,如代码所示。



getCustomerInfo方法是同步调用来获取会员资产的方法,它首先构建了3个会员资产的查询任务——couponTask、benefitsTask、balanceTask,然后逐个同步调用各个任务对应的run方法来查询资产。

getAsyncCustomerInfo方法是异步获取会员资产的方法,它同样定义了3个会员资产的查询任务——couponTask、benefitsTask、balanceTask,然后将任务提交到线程池中去执行,通过Future来异步获取线程的执行结果。通过线程池的并发执行可以明显缩短接口的执行时间。运行结果如图所示。

整个设计方案采用了FutureTask的异步执行能力与ThreadPoolExecutor线程池的多线程执行能力,实现了会员资产异步查询。但在代码编写上需要多构建CustomerBenefits-Task、CustomerBalanceTask、CustomerCouponTask等任务类,代码实现比较臃肿,在实际项目开发中成本比较高。

3.3、改进方案

Spring Boot框架提供了@Async注解来实现异步调用。在方法上加入@Async注解,在实际执行时Spring Boot会自动将该方法提交到Spring TaskExecutor中,由指定的线程池中的线程执行。会员资产查询—改进方案的UML图,如图所示。

下面是通过@Async注解来完成异步调用的具体实现。

1. AsyncConfig
AsyncConfig用于初始化异步任务的线程池,需要在类上加入@Configuration注解,确保其在Spring Boot项目启动的时候会被初始化。在initPoolTaskExecutor方法内部构建一个异步任务执行的线程池ThreadPoolTaskExecutor。AsyncConfig初始化代码如代码所示。

2. CustomerInfoManager
CustomerInfoManager实现了获取会员资产的3个方法:queryAccount方法用来查询会员的储值;queryBenefits方法用来查询会员的权益;queryCouponList方法用来查询会员的优惠券信息。如代码所示。


3. CustomerInfoServiceImpl
CustomerInfoServiceImpl是会员资产的具体实现类,实现了CustomerInfoService接口的功能,如代码所示。getAsyncCustomerInfo方法通过调用CustomerInfoManager的方法来查询会员的资产信息。

4、批量处理任务的执行

在业务系统中经常会有一些定时或周期性的批处理任务来进行数据的处理与统计,这类任务称为批量处理任务。批量处理任务通常不需要人工参与,每次要处理的数据量特别大,任务执行的时间比较长,对系统的资源消耗比较多。在这种场景中,我们可以通过定时任务+业务线程池来提升任务处理的效率。批量处理任务模型如图所示。


定时任务负责从数据库中获取要处理的数据,然后将业务数据封装成线程任务,提交到线程池中执行。线程任务在完成业务处理后需要更新数据库中的数据状态,确保已经处理过的数据不会被重复处理。

下面以短信营销场景为例分析如何通过线程池来实现批量处理。短信营销是电商系统中最常见的营销模式,通过短信的方式来触达用户。例如,在用户生日的当天,营销系统会发送一条生日祝福的信息;会员卡或者优惠券到期时,营销系统会给用户发送一条提醒信息;活动快开始时,营销系统会发送一条活动开始的提醒信息等。

4.1、设计原理

整个发送信息过程可以分为两个步骤:从数据库中获取要发送信息的手机号、信息内容等,调用短信通道的API来发送信息。单线程发送信息模型如图所示。

获取发送信息的步骤也可以采用定时任务来处理。定时任务由ScheduledExecutor-Service线程池进行调度。发送信息的过程由普通的线程任务来处理,并采用ThreadPool-Executor线程池进行调度。多线程发送信息模型如图所示。

4.2、实现方案

多线程信息发送的UML图如图所示。

其中:

  • MessageScanTask任务负责从数据中获取要发送的信息。
  • MessageSendTask任务负责发送信息。
  • MessageManager是数据访问的模拟实现类,模拟与数据库进行交互。
  • MessageScanService是数据扫描服务的接口,内部定义了扫描任务的接口方法。
  • MessageSendService是信息发送的接口,定义了信息发送的sendMessage方法。
  • MessageServiceImpl是信息发送服务的具体实现类,实现了MessageScanService、MessageSendService接口。

下面对信息发送的实现进行介绍。

1. MessageManager

MessageManager是模拟与数据库交互的实现类,如代码所示。其中,update-SendStatus方法用来模拟更新数据中已经处理的数据状态,scanMessage方法模拟从数据库中读取需要发送信息的数据。


2. MessageScanTask

MessageScanTask负责从数据库中获取要发送的信息并将信息转成任务提交到线程池中去执行。首先调用messageManager的scanMessage方法来获取要发送的信息的任务数据,然后调用MessageSendService的sendMessage方法发送信息。代码是MessageScanTask的具体实现。



3. MessageSendTask

MessageSendTask是负责发送信息的,内部的run方法进行了模拟信息发送的实现,发送完成后会调用MessageManager的updateSendStatus方法来更新信息发送的状态,防止多次发送。代码是MessageSendTask的实现。


4. MessageScanService

MessageSendService是扫描要发送信息的服务接口,它定义了启动与停止扫描任务的方法。代码是MessageSendService的实现。

5. MessageSendService

MessageSendService是信息发送服务接口,内部定义了信息发送的sendMessage方法。代码是MessageSendService的实现。

MessageServiceImpl是信息服务的具体实现类,它实现了信息扫描与信息处理的功能。如代码所示,MessageServiceImpl内部定义了两个线程池:定时任务线程池schedu-leThreadPool,以及工作线程池workThreadPool。scheduleThreadPool的核心线程数是1,大多数时候只有一个任务在线程池中执行。workThreadPool设定了最小线程数100和最大线程数1000。在构造函数内部,MessageServiceImpl完成了定时任务线程池与业务处理线程池的初始化。



sendMessage方法会根据发送的信息数据构造一个信息发送任务MessageSendTask,然后将信息发送任务提交到ThreadPoolExecutor中执行。startMessageScan方法首先会判断扫描任务MessageScanTask是否存在,如果不存在就构建一个扫描任务,然后将扫描任务提交到定时任务线程池中。stopMessageScan方法是用来停止扫描线程工作的,首先会将扫描任务设置到不工作的状态,然后关闭整个线程池。

5、并发排队队列

在电商业务中,为了吸引顾客、聚集人气,运营人员经常会策划一些秒杀活动。通常活动中的商品价格远低于市场价格,例如1元秒杀。秒杀活动一般会严格限制活动时间以及商品的库存数量。因为活动的商家以低廉的价格吸引了大量用户参与,所以在活动开始的瞬间会有几万到几十万的消费者蜂拥而来,在短短几秒时间内将商品抢购一空。如图所示,每次用户尝试下单的时候都会触发扣库存请求,在一瞬间会有几万到几十万的扣库存请求,从而造成数据库的热点数据,导致数据库崩溃。


比较常见的方案是将扣库存的动作放到集中式缓存(如Redis)中进行处理,但也会带来缓存热点数据、缓存与数据库数据的一致性问题。有一种比较好的方案是在应用层进行并发排队,按照商品维度设置排队队列,每个队列设置一个线程,按顺序执行扣库存的操作。这样能确保一台机器在同一个时刻只有一个线程对数据库的同一行记录进行修改操作。

5.1、设计原理

并发排队队列在设计上参照了Hash表的设计思想:在内部定义一个线程池数组table来管理所有线程池;采用LinkedBlockingQueue作为存储任务的链表;采用ThreadPool-Executor线程池来调度线程任务。并发排队队列的设计如图所示。


每次向等待队列中插入一个扣库存请求的时候,会根据商品ID进行散列运算,然后对线程池数组table的长度进行取模,这样确保同一个商品的库存请求进入同一个任务队列。table数组的长度必须是2的N次方,这样可以将取模运算变成“&”运算。Thread-PoolExecutor线程池的核心线程数和最大线程数都为1,因此始终只有一个线程来更新数据库的库存。

5.2、实现方案

ItemTask是扣库存的线程任务,实现了Callable接口,获得了异步执行并返回执行结果的能力。DispatchQueue是并发排队的接口,定义了线程排队等待的submit方法。DispatchQueueImpl是排队等待的具体实现类,定义了队列数dispatchQueueSize、线程池数组table等变量。并发队列的UML图如图所示。

1. ItemTask

ItemTask模拟扣库存的任务,实现了Callable接口,获得了返回线程任务执行结果的能力。如果任务执行成功则返回true,如果失败则返回false。ItemTask实现如代码所示。


2. DispatchQueue

DispatchQueue是并发排队服务的接口,它定义了任务排队的接口方法submit,如代码所示。


3. DispatchQueueImpl

DispatchQueueImpl是并发排队服务的具体实现类,它将同一个商品的所有扣库存的请求都分配到同一个线程池中。线程池只有一个线程来处理扣库存的任务,从而实现了扣库存请求在线程池的任务队列中等待的功能。DispatchQueueImpl内部定义了线程池的线程数THREAD_SIZE、线程存活时间KEEP_LIVE、默认等待队列大小THREAD_POOL_SIZE等常量。DispatchQueueImpl定义了等待队列大小dispatchQueueSize、线程池数组table等变量。代码是DispatchQueueImpl的具体实现。



DispatchQueueImpl默认的等待队列数是32,也就是会创建32个线程池,每个线程池中只有一个线程。在构造函数内部,DispatchQueueImpl会调用initTable方法来完成所有线程池的初始化。

submit方法是实现排队等待的核心方法,它首先会根据商品ID计算商品对应的线程池数组的index,然后获取index对应的线程池,最后调用线程池ThreadPoolExecutor的submit方法向线程池中提交异步任务。运行结果如图所示。

http://www.cnnetsun.cn/news/2856800.html

相关文章:

  • STM32F4实战:5分钟搞定CANopen快速SDO通信,读取节点数据就这么简单
  • 别急着点‘忽略’!深入理解IntelliJ IDEA的File Cache机制,避免团队协作中的代码覆盖风险
  • SOLIDWORKS 2024导出DWG图纸,TrueType和SHX字体到底怎么选?看完这篇不再纠结
  • 别再为嵌入式打印浮点数发愁了!手把手教你魔改SEGGER RTT的printf函数
  • 我让 Claude Code 帮我把求职流程自动化,740 个岗位后拿下了 Dream Offer
  • 2022-TKDE《Low-Rank Linear Embedding for Robust Clustering 》
  • 程序间博弈研究:有限状态机竞争、进化与不同游戏策略分析
  • 2026图片去水印工具推荐免费电脑手机在线,好用的图片去水印软件无广告
  • iOS 27 即将发布,哪些 iPhone 机型可升级?何时能用上?
  • 皮阿诺全系高环保板材实现ENF/F4星双达标!权威鉴证,环保安芯
  • UI-App 技术架构分析
  • UG/NX模型转换GLB格式技术规范文档(在线无损转换方案)
  • QMCDecode:3步快速解密QQ音乐加密格式的终极Mac工具指南
  • AI搜索品牌排名检测:结合LangChain实测5大AI平台,100次查询排名波动分析
  • 2026宁波市权威认证贵金属回收 TOP5+黄金回收白银回收铂金回收门店地址电话推荐
  • WarcraftHelper技术解析:重构经典魔兽争霸III的现代游戏体验
  • 嵌入式Linux学习
  • 当“空中巨龙”遇见“AI大脑”:国内顶尖AI讲师颜少林在蓉城玩转工业大模型
  • 破壁机“修不好”?客服小李用一颗10uF钽电容解决了四次返修难题
  • linux qnx git 命令 1
  • 纷享销客、八百客、用友CRM:行业应用与选型建议
  • 一本好书:吃透 Agentic AI 核心不踩坑
  • 报警画面设计误区盘点:这些错误你犯了几个?
  • WWDC26 全程解读:苹果牵手谷歌 Gemini,Siri 重生为「Siri AI」,但中国用户要再等等
  • 【Java 入门 Day11】 三大修饰符(上):abstractstatic 篇
  • 066、Demosaic 去马赛克算法:双线性、VNG、边缘自适应插值的画质与算力对比
  • 知识追踪驱动的自适应学习系统:基于贝叶斯网络的算法训练
  • 慢查询优化八股文:抓住这 8 个关键点,面试基本稳了
  • EldenRingSaveCopier:拯救你的艾尔登法环游戏进度的终极方案
  • 车流流速智能解析算法,赋能高速路况动态视频孪生调度