Executors框架

Executors框架1 前言 通常 java 最简单的线程的例子是这样的 public static void main String args Runnable runnable gt System out println Thread is running Thread thread new

大家好,我是讯享网,很高兴认识大家。

1 前言

  通常java最简单的线程的例子是这样的:

 public static void main(String[] args) { Runnable runnable = () -> System.out.println("Thread is running."); Thread thread = new Thread(runnable); thread.start(); }

讯享网

        在小的示例程序中如上述实践是可以的;但在大规模的应用中将线程的管理和创建应用的其它部分分开则是合理的。封装了线程的管理和创建函数的对象就是executors,可以提高管理效率和降低线程反复创建和销毁带来的开销。

线程的创建因为涉及到和操作系统的交互所以开销会比较大

  那么封装了线程管理和创建这些功能的对象就是 java.util.concurrent.Executors


讯享网

2 Executor线程池

  Executor就是java.util.concurrent.Executors通过上述红框中几个静态方法创建的执行器(线程池)线程池就是用于执行提交的Runnable任务。该接口将任务的提交和任务如何被执行(包括线程的使用细节、任务排程等)解耦。Executor线程池的使用过程中不需要显示的创建线程。你不再需要像 new Thread(new(RunnableTask())).start()一样为任务集合中的每一个任务创建线程来执行任务,线程池的使用如下:

讯享网Executor executor = anExecutor; executor.execute(new RunnableTask1()); executor.execute(new RunnableTask2());

2.1 newFixedThreadPool

Executors.newFixedThreadPool(10);

2.2 newSingleThreadExecutor

讯享网Executors.newSingleThreadExecutor();

2.3 newCachedThreadPool

Executors.newCachedThreadPool();

2.4 手动创建线程池

在alibaba的《java编程手册》中提到:

4.【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors返回的线程池对象的弊端如下:

1) newFixedThreadPool(int)和newSingleThreadPool():

允许的请求队列长度为Integer.MAX_VALUE,可能会对接大量的请求,从而导致OOM。

2) newCachedThreadPool():

允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

在后续对线程池模型的详细讲解中,可以印证该实践的合理性。

老猿说说-SynchronousQueue_老猿说说的博客-CSDN博客引导语SynchronousQueue 是比较独特的队列,其本身是没有容量大小,比如我放一个数据到队列中,我是不能够立马返回的,我必须等待别人把我放进去的数据消费掉了,才能够返回。SynchronousQueue 在消息队列技术中间件中被大量使用,本文就来从底层实现来看下 SynchronousQueue 到底是如何做到的。1 整体架构SynchronousQueue 的整体设计比较抽象,在内部抽象出了两种算法实现,一种是先入先出的队列,一种是后入先出的堆栈,两种算法被两个内部类实现,而直接对外的 https://blog.csdn.net/zlfing/article/details/

https://www.cnblogs.com/aspirant/p/10731108.htmlhttps://www.cnblogs.com/aspirant/p/10731108.html

讯享网/ * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { 
  
    
  

2.4.1 ThreadPoolExecutor

        对创建函数 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)的参数进行一下详细说明:

  • corePoolSize int

  始终保持在线程池中的线程数量,即使这些线程是闲置的。特别说明:如果属性allowCoreThreadTimeOut被设置成了false,那么核心线程释放的时间也以keepAliveTime为准。

//设置allowCoreThreadTimeout ThreadPoolExecutor leader5ThreadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), new CustomizableThreadFactory("消费线程-")); leader5ThreadPoolExecutor.allowCoreThreadTimeOut(true); //源码 public void allowCoreThreadTimeOut(boolean value) { if (value && keepAliveTime <= 0) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); if (value != allowCoreThreadTimeOut) { allowCoreThreadTimeOut = value; if (value) interruptIdleWorkers(); } }

  关于核心线程数大小的设置有传说中的大厂经验:

CPU密集型 核心线程数=CPU核数+1
IO密集型 核心线程数=CPU核数 * 2

  • maximumPoolSize int

  线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。

  • keepAliveTime long

  当线程数量超出核心线程池数量,那么超出的部分在被关闭前的最大闲置时间

  • unit TimeUnit

  keepAliveTime参数的单位

  • workQueue BlockingQueue<Runnable>

  任务在被执行前放置在该队列

  • threadFactory ThreadFactory

  线程池创建线程时使用的创建工厂;线程池有个默认的线程工厂

讯享网ThreadFactory threadFactory = Executors.defaultThreadFactory();
  • handler RejectedExecutionHandler

  the handler to use when execution is blocked because the thread bounds and queue capacities are reached.

我们着重解释下ThreadFactory和RejectedExecutionHandler

2.4.2 ThreadFactory

ThreadFactory threadFactory = new ThreadFactory() { @Override public Thread newThread(@NotNull Runnable r) { return null; } };

2.4.3 RejectedExecutionHandler

  1. 默认是AbortPolicy;该策略直接丢弃提交任务,并抛出RejectedExecutionException异常
  2. DiscardPolicy:该策略也是将任务抛弃掉(对于提交的任务不管不问,什么也不做),不过并不抛出异常。
  3. DiscardOldestPolicy:该策略是当执行器未关闭时,从任务队列workQueue中取出第一个任务,并抛弃这第一个任务,进而有空间存储刚刚提交的任务。使用该策略要特别小心,因为它会直接抛弃之前的任务。
  4. CallerRunsPolicy:该策略并没有抛弃任何的任务,由于线程池中已经没有了多余的线程来分配该任务,该策略是在当前线程(调用者线程)中直接执行该任务。
讯享网 public static class Task implements Runnable { protected String taskName; public Task(String name) { super(); this.taskName = name; } @Override public void run() { try { System.out.println(this.taskName + " is running."); Thread.sleep(500); } catch (Exception ignored) { } } } public static void main(String[] args) { // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1,"线程池"的阻塞队列容量为1。 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1)); // 设置线程池的拒绝策略为AbortPolicy pool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { } }); try { // 新建10个任务,并将它们添加到线程池中 for (int i = 0; i < 10; i++) { Runnable myTask = new Task("Task-" + i); pool.submit(myTask); } } catch (RejectedExecutionException e) { e.printStackTrace(); // 关闭线程池 pool.shutdown(); } }

2.4.4 ScheduledThreadPoolExecutor

定时器ScheduledExecutorService原理分析_yegeg的博客-CSDN博客ScheduledExecutorService是怎么存放任务,获取即将要执行的任务,它是怎么实现周期性调用的,执行的任务延时这段时间,线程又做了什么,他和线程池有什么关系https://blog.csdn.net/yegeg/article/details/

3 实践整理

3.1到底应该创建几个线程池

3.2核心线程怎么实现一直存活?

阻塞队列方法有四种形式,它们以不同的方式处理操作,如下表。

抛出异常    返回特殊值    一直阻塞    超时退出
插入    add(e)    offer(e)    put(e)    offer(e,time,unit)
移除    remove()    poll()    take()    poll(time,unit)
检查    element()    peek()    不可用    不可用
核心线程在获取任务时,通过阻塞队列的 take() 方法实现的一直阻塞(存活)。
 

4 参考资料

《Java线程池实现原理及其在美团业务中的实践》https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
小讯
上一篇 2025-03-02 12:13
下一篇 2025-02-16 14:46

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/52483.html