1.1 tomcat线程池和juc线程流程

jdk 线程策略

tomcat 线程池策略

1.2 tomcat线程池和juc线程池的区别

tomcat 线程池是在 juc线程池的基础上,修改少量代码实现tomcat 线程池执行流程和 juc的线程池执行流程有着很大的区别,那么tomcat为什么要这样设计

使用线程池的任务有两种:

1.3 IO密集型和CPU密集型任务核心参数设置

Tomcat线程池执行流程

自定义队列

Tomcat 主要是通过实现自定义队列来完成逻辑的改造。

/**
 * 实现Tomcat特有逻辑自定义队列
 */
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
    private static final long serialVersionUID = 1L;

    private transient volatile ThreadPoolExecutor parent = null;

    private static final int DEFAULT_FORCED_REMAINING_CAPACITY = -1;

    /**
     * 强制遗留的容量
     */
    private int forcedRemainingCapacity = -1;

    /**
     * 队列的构建方法
     */
    public TaskQueue() {
    }

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public TaskQueue(Collection<? extends Runnable> c) {
        super(c);
    }

    /**
     * 设置核心变量
     */
    public void setParent(ThreadPoolExecutor parent) {
        this.parent = parent;
    }

    /**
     * put:向阻塞队列填充元素,当阻塞队列满了之后,put时会被阻塞。
     * offer:向阻塞队列填充元素,当阻塞队列满了之后,offer会返回false。
     *
     * @param o 当任务被拒绝后,继续强制的放入到线程池中
     * @return 向阻塞队列塞任务,当阻塞队列满了之后,offer会返回false。
     */
    public boolean force(Runnable o) {
        if (parent == null || parent.isShutdown()) {
            throw new RejectedExecutionException("taskQueue.notRunning");
        }
        return super.offer(o);
    }

    /**
     * 带有阻塞时间的塞任务
     */
    @Deprecated
    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (parent == null || parent.isShutdown()) {
            throw new RejectedExecutionException("taskQueue.notRunning");
        }
        return super.offer(o, timeout, unit); //forces the item onto the queue, to be used if the task is rejected
    }

    /**
     * 当线程真正不够用时,优先开启线程(直至最大线程),其次才是向队列填充任务。
     *
     * @param runnable 任务
     * @return false 表示向队列中添加任务失败,
     */
    @Override
    public boolean offer(Runnable runnable) {
        if (parent == null) {
            return super.offer(runnable);
        }
        //若是达到最大线程数,进队列。
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) {
            return super.offer(runnable);
        }
        //当前活跃线程为10个,但是只有8个任务在执行,于是,直接进队列。
        if (parent.getSubmittedCount() < (parent.getPoolSize())) {
            return super.offer(runnable);
        }
        //当前线程数小于最大线程数,那么直接返回false,去创建最大线程
        if (parent.getPoolSize() < parent.getMaximumPoolSize()) {
            return false;
        }
        //否则的话,将任务放入到队列中
        return super.offer(runnable);
    }

    /**
     * 获取任务
     */
    @Override
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
        Runnable runnable = super.poll(timeout, unit);
        //取任务超时,会停止当前线程,来避免内存泄露
        if (runnable == null &amp;&amp; parent != null) {
            parent.stopCurrentThreadIfNeeded();
        }
        return runnable;
    }

    /**
     * 阻塞式的获取任务,可能返回null。
     */
    @Override
    public Runnable take() throws InterruptedException {
        //当前线程应当被终止的情况下:
        if (parent != null &amp;&amp; parent.currentThreadShouldBeStopped()) {
            long keepAliveTime = parent.getKeepAliveTime(TimeUnit.MILLISECONDS);
            return poll(keepAliveTime, TimeUnit.MILLISECONDS);
        }
        return super.take();
    }

    /**
     * 返回队列的剩余容量
     */
    @Override
    public int remainingCapacity() {
        if (forcedRemainingCapacity > DEFAULT_FORCED_REMAINING_CAPACITY) {
            return forcedRemainingCapacity;
        }
        return super.remainingCapacity();
    }


    /**
     * 强制设置剩余容量
     */
    public void setForcedRemainingCapacity(int forcedRemainingCapacity) {
        this.forcedRemainingCapacity = forcedRemainingCapacity;
    }

    /**
     * 重置剩余容量
     */
    void resetForcedRemainingCapacity() {
        this.forcedRemainingCapacity = DEFAULT_FORCED_REMAINING_CAPACITY;
    }
}

2.2 自定义线程池ThreadPoolExecutor

Tomcat 线程池 ThreadPoolExecutor 是继承的 AbstractExecutorService 类,但是很多代码依旧使用的是 JDK 的 ThreadPoolExecutor,只是稍微改造了一部分

    public void execute(Runnable command, long timeout, TimeUnit unit) {

        /**
         * 提交任务的数量+1
         */
        submittedCount.incrementAndGet();
        try {
            /**
             * 线程池内部方法,真正执行的方法。就是JDK线程池原生方法。
             *
             * 因为重写了阻塞队列,才完成Tomcat特有逻辑的实现。
             *
             * 1. 重写队列方法;达到核心线程数,然后向阻塞队列中放置,阻塞队列直接返回false
             * 2. 返回false,则开启maximum pool size;
             * 3. maximum pool size到达极限时,会抛出RejectedExecutionException方法。
             *
             */
            executeInternal(command);
            /**
             * 任务被拒绝
             */
        } catch (RejectedExecutionException rx) {
            /**
             * 在将被拒绝的任务放入到队列中。
             */
            if (getQueue() instanceof TaskQueue) {

                //如果Executor接近最大线程数,应该将任务添加到队列中,而不是拒绝。
                final TaskQueue queue = (TaskQueue) getQueue();
                try {
                    //强制的将任务放入到阻塞队列中
                    if (!queue.force(command, timeout, unit)) {
                        //放入失败,则继续抛出异常
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException("threadPoolExecutor.queueFull");
                    }
                } catch (InterruptedException x) {
                    //被中断也抛出异常
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else {
                //不是这种队列,那么当任务满了之后,直接抛出去。
                submittedCount.decrementAndGet();
                throw rx;
            }
        }
    }
    
    /**
     * JDK线程池的任务执行的逻辑
     */
    private void executeInternal(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        int c = ctl.get();
        //未达到corePoolSize数量,则去开启线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true)) {
                return;
            }
            c = ctl.get();
        }
        //开启corePoolSize数量的工作线程,则将任务放入队列。
        //但是Tomcat重写了阻塞队列。
        //当放入workQueue.offer(command)返回false,则继续开启线程数
        if (isRunning(c) &amp;&amp; workQueue.offer(command)) {
            int recheck = ctl.get();
            if (!isRunning(recheck) &amp;&amp; remove(command)) {
                reject(command);
            } else if (workerCountOf(recheck) == 0) {
                addWorker(null, false);
            }
        } else if (!addWorker(command, false)) {
            //开启最大线程失败,则任务被拒绝。
            reject(command);
        }
    }

注意事项

在这里插入图片描述

tomcat优化

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) {
            decrementWorkerCount();
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                //获取剩余的最小线程数,若配置了允许最小线程数关闭参数min=0
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                //当配置了允许最小线程关闭参数,且队列不为空的情况下,允许保留的最小线程数为1
                /**
                 * 这是关闭空闲核心线程数时的判断。
                 */
                if (min == 0 && !workQueue.isEmpty()) {
                    min = 1;
                }
                // https://bz.apache.org/bugzilla/show_bug.cgi?id=65454
                // If the work queue is not empty, it is likely that a task was
                // added to the work queue between this thread timing out and
                // the worker count being decremented a few lines above this
                // comment. In this case, create a replacement worker so that
                // the task isn't held in the queue waiting for one of the other
                // workers to finish.
                /**
                 * 若当前工作数量>允许的最小线程数,那么关闭改线程。
                 * Tomcat对此处进行了优化。
                 */
                if (workerCountOf(c) >= min && workQueue.isEmpty()) {
                    return; // replacement not needed
                }
            }
            /**
             * 没有终止线程,又重新开启线程
             */
            addWorker(null, false);
        }
    }

原文地址:https://blog.csdn.net/qq_41956014/article/details/131141941

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。

如若转载,请注明出处:http://www.7code.cn/show_13079.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注