0

0

SpringBoot线程池和Java线程池怎么使用

PHPz

PHPz

发布时间:2023-05-18 12:46:36

|

1055人浏览过

|

来源于亿速云

转载

    SpringBoot线程池和Java线程池的用法和实现原理

    使用默认的线程池

    方式一:通过@Async注解调用

    public class AsyncTest {
        @Async
        public void async(String name) throws InterruptedException {
            System.out.println("async" + name + " " + Thread.currentThread().getName());
            Thread.sleep(1000);
        }
    }

    启动类上需要添加@enableasync注解,否则不会生效。

    @SpringBootApplication
    //@EnableAsync
    public class Test1Application {
       public static void main(String[] args) throws InterruptedException {
          ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args);
          AsyncTest bean = run.getBean(AsyncTest.class);
          for(int index = 0; index <= 10; ++index){
             bean.async(String.valueOf(index));
          }
       }
    }

    方式二:直接注入 ThreadPoolTaskExecutor

    此时可不加 @EnableAsync注解

    @SpringBootTest
    class Test1ApplicationTests {
    
       @Resource
       ThreadPoolTaskExecutor threadPoolTaskExecutor;
    
       @Test
       void contextLoads() {
          Runnable runnable = () -> {
             System.out.println(Thread.currentThread().getName());
          };
    
          for(int index = 0; index <= 10; ++index){
             threadPoolTaskExecutor.submit(runnable);
          }
       }
    
    }

    线程池默认配置信息

    SpringBoot线程池的常见配置:

    spring:
      task:
        execution:
          pool:
            core-size: 8
            max-size: 16                          # 默认是 Integer.MAX_VALUE
            keep-alive: 60s                       # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止
            allow-core-thread-timeout: true       # 是否允许核心线程超时,默认true
            queue-capacity: 100                   # 线程队列的大小,默认Integer.MAX_VALUE
          shutdown:
            await-termination: false              # 线程关闭等待
          thread-name-prefix: task-               # 线程名称的前缀

    SpringBoot 线程池的实现原理

    TaskExecutionAutoConfiguration 类中定义了 ThreadPoolTaskExecutor,该类的内部实现也是基于java原生的 ThreadPoolExecutor类。initializeExecutor()方法在其父类中被调用,但是在父类中 RejectedExecutionHandler 被定义为了 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); ,并通过initialize()方法将AbortPolicy传入initializeExecutor()中。

    注意在TaskExecutionAutoConfiguration 类中,ThreadPoolTaskExecutor类的bean的名称为: applicationTaskExecutor 和 taskExecutor

    立即学习Java免费学习笔记(深入)”;

    // TaskExecutionAutoConfiguration#applicationTaskExecutor()
    @Lazy
    @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
          AsyncAnnotationBeanPostProcessor.DEFAUL
              T_TASK_EXECUTOR_BEAN_NAME })
    @ConditionalOnMissingBean(Executor.class)
    public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
       return builder.build();
    }
    // ThreadPoolTaskExecutor#initializeExecutor()
    @Override
    protected ExecutorService initializeExecutor(
          ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
    
       BlockingQueue queue = createQueue(this.queueCapacity);
    
       ThreadPoolExecutor executor;
       if (this.taskDecorator != null) {
          executor = new ThreadPoolExecutor(
                this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                queue, threadFactory, rejectedExecutionHandler) {
             @Override
             public void execute(Runnable command) {
                Runnable decorated = taskDecorator.decorate(command);
                if (decorated != command) {
                   decoratedTaskMap.put(decorated, command);
                }
                super.execute(decorated);
             }
          };
       }
       else {
          executor = new ThreadPoolExecutor(
                this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                queue, threadFactory, rejectedExecutionHandler);
    
       }
    
       if (this.allowCoreThreadTimeOut) {
          executor.allowCoreThreadTimeOut(true);
       }
    
       this.threadPoolExecutor = executor;
       return executor;
    }
    // ExecutorConfigurationSupport#initialize()
    public void initialize() {
       if (logger.isInfoEnabled()) {
          logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
       }
       if (!this.threadNamePrefixSet && this.beanName != null) {
          setThreadNamePrefix(this.beanName + "-");
       }
       this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
    }

    覆盖默认的线程池

    覆盖默认的 taskExecutor对象,bean的返回类型可以是ThreadPoolTaskExecutor也可以是Executor

    @Configuration
    public class ThreadPoolConfiguration {
    
        @Bean("taskExecutor")
        public ThreadPoolTaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            //设置线程池参数信息
            taskExecutor.setCorePoolSize(10);
            taskExecutor.setMaxPoolSize(50);
            taskExecutor.setQueueCapacity(200);
            taskExecutor.setKeepAliveSeconds(60);
            taskExecutor.setThreadNamePrefix("myExecutor--");
            taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
            taskExecutor.setAwaitTerminationSeconds(60);
            //修改拒绝策略为使用当前线程执行
            taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            //初始化线程池
            taskExecutor.initialize();
            return taskExecutor;
        }
    }

    管理多个线程池

    如果出现了多个线程池,例如再定义一个线程池 taskExecutor2,则直接执行会报错。此时需要指定bean的名称即可。

    @Bean("taskExecutor2")
    public ThreadPoolTaskExecutor taskExecutor2() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor2--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }

    引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。

    @Resource
    ThreadPoolTaskExecutor taskExecutor2;

    对于使用@Async注解的多线程则在注解中指定bean的名字即可。

    @Async("taskExecutor2")
        public void async(String name) throws InterruptedException {
            System.out.println("async" + name + " " + Thread.currentThread().getName());
            Thread.sleep(1000);
        }

    线程池的四种拒绝策略

    JAVA常用的四种线程池

    ThreadPoolExecutor 类的构造函数如下:

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                              BlockingQueue workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    newCachedThreadPool

    不限制最大线程数(maximumPoolSize=Integer.MAX_VALUE),如果有空闲的线程超过需要,则回收,否则重用已有的线程。

    new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue());

    newFixedThreadPool

    定长线程池,超出线程数的任务会在队列中等待。

    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue());

    newScheduledThreadPool

    类似于newCachedThreadPool,线程数无上限,但是可以指定corePoolSize。可实现延迟执行、周期执行。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    周期执行:

    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
    scheduledThreadPool.scheduleAtFixedRate(()->{
       System.out.println("rate");
    }, 1, 1, TimeUnit.SECONDS);

    延时执行:

    BgSub
    BgSub

    免费的AI图片背景去除工具

    下载
    scheduledThreadPool.schedule(()->{
       System.out.println("delay 3 seconds");
    }, 3, TimeUnit.SECONDS);

    newSingleThreadExecutor

    单线程线程池,可以实现线程的顺序执行。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue()));
    }

    Java 线程池中的四种拒绝策略

    • CallerRunsPolicy:线程池让调用者去执行。

    • AbortPolicy:如果线程池拒绝了任务,直接报错。

    • DiscardPolicy:如果线程池拒绝了任务,直接丢弃。

    • DiscardOldestPolicy:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。

    CallerRunsPolicy

    直接在主线程中执行了run方法。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
     
        public CallerRunsPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    效果类似于:

    Runnable thread = ()->{
       System.out.println(Thread.currentThread().getName());
       try {
          Thread.sleep(0);
       } catch (InterruptedException e) {
          throw new RuntimeException(e);
       }
    };
    
    thread.run();

    AbortPolicy

    直接抛出RejectedExecutionException异常,并指示任务的信息,线程池的信息。、

    public static class AbortPolicy implements RejectedExecutionHandler {
     
        public AbortPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    DiscardPolicy

    什么也不做。

    public static class DiscardPolicy implements RejectedExecutionHandler {
     
        public DiscardPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

    DiscardOldestPolicy

    • e.getQueue().poll() : 取出队列最旧的任务。

    • e.execute(r) : 当前任务入队。

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
     
        public DiscardOldestPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

    Java 线程复用的原理

    java的线程池中保存的是 java.util.concurrent.ThreadPoolExecutor.Worker 对象,该对象在 被维护在private final HashSet workers = new HashSet();workQueue是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue队列中。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
    
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
    
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
    
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
    
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
    
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

    work对象的执行依赖于 runWorker(),与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

    相关文章

    java速学教程(入门到精通)
    java速学教程(入门到精通)

    java怎么学习?java怎么入门?java在哪学?java怎么学才快?不用担心,这里为大家提供了java速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!

    下载

    相关标签:

    本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

    相关专题

    更多
    excel制作动态图表教程
    excel制作动态图表教程

    本专题整合了excel制作动态图表相关教程,阅读专题下面的文章了解更多详细教程。

    20

    2025.12.29

    freeok看剧入口合集
    freeok看剧入口合集

    本专题整合了freeok看剧入口网址,阅读下面的文章了解更多网址。

    65

    2025.12.29

    俄罗斯搜索引擎Yandex最新官方入口网址
    俄罗斯搜索引擎Yandex最新官方入口网址

    Yandex官方入口网址是https://yandex.com;用户可通过网页端直连或移动端浏览器直接访问,无需登录即可使用搜索、图片、新闻、地图等全部基础功能,并支持多语种检索与静态资源精准筛选。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

    197

    2025.12.29

    python中def的用法大全
    python中def的用法大全

    def关键字用于在Python中定义函数。其基本语法包括函数名、参数列表、文档字符串和返回值。使用def可以定义无参数、单参数、多参数、默认参数和可变参数的函数。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

    16

    2025.12.29

    python改成中文版教程大全
    python改成中文版教程大全

    Python界面可通过以下方法改为中文版:修改系统语言环境:更改系统语言为“中文(简体)”。使用 IDE 修改:在 PyCharm 等 IDE 中更改语言设置为“中文”。使用 IDLE 修改:在 IDLE 中修改语言为“Chinese”。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

    16

    2025.12.29

    C++的Top K问题怎么解决
    C++的Top K问题怎么解决

    TopK问题可通过优先队列、partial_sort和nth_element解决:优先队列维护大小为K的堆,适合流式数据;partial_sort对前K个元素排序,适用于需有序结果且K较小的场景;nth_element基于快速选择,平均时间复杂度O(n),效率最高但不保证前K内部有序。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

    12

    2025.12.29

    php8.4实现接口限流的教程
    php8.4实现接口限流的教程

    PHP8.4本身不内置限流功能,需借助Redis(令牌桶)或Swoole(漏桶)实现;文件锁因I/O瓶颈、无跨机共享、秒级精度等缺陷不适用高并发场景。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

    134

    2025.12.29

    抖音网页版入口在哪(最新版)
    抖音网页版入口在哪(最新版)

    抖音网页版可通过官网https://www.douyin.com进入,打开浏览器输入网址后,可选择扫码或账号登录,登录后同步移动端数据,未登录仅可浏览部分推荐内容。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

    63

    2025.12.29

    快手直播回放在哪看教程
    快手直播回放在哪看教程

    快手直播回放需主播开启功能才可观看,主要通过三种路径查看:一是从“我”主页进入“关注”标签再进主播主页的“直播”分类;二是通过“历史记录”中的“直播”标签页找回;三是进入“个人信息查阅与下载”里的“直播回放”选项。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

    18

    2025.12.29

    热门下载

    更多
    网站特效
    /
    网站源码
    /
    网站素材
    /
    前端模板

    精品课程

    更多
    相关推荐
    /
    热门推荐
    /
    最新课程
    Kotlin 教程
    Kotlin 教程

    共23课时 | 2.1万人学习

    C# 教程
    C# 教程

    共94课时 | 5.6万人学习

    Java 教程
    Java 教程

    共578课时 | 39.6万人学习

    关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
    php中文网:公益在线php培训,帮助PHP学习者快速成长!
    关注服务号 技术交流群
    PHP中文网订阅号
    每天精选资源文章推送

    Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号