当前位置:首页 » 手机赚钱app » 正文

任务赚钱平台源码

0 人参与  2019-09-06 13:03  分类 : 手机赚钱app  点这评论

任务赚钱平台源码  新浪港股讯9月6日音讯,苹果见解股涨幅扩年夜,制止发稿,瑞声科技涨6.65%领涨蓝筹;丘钛科技涨5.94%;舜宇光学涨5.21%;比亚迪电子涨4.74%;高伟电子涨4.17%。

义务编辑:马婕

任务赚钱平台源码java.util.concurrent.ThreadPoolExecutor 核心线程池,继承自AbstractExecutorService,实现为了ExecutorService以及Executor接口。 线程池的长处 低落功能消耗、提高响应速度:对于付使用必要频繁创立线程,而且线程任务都比力简单,比如一些IO任务,线程的性命周期都很短;而线程的创立必要耗费肯定的CPU工夫,所以当任务到来时假如线程曾经经预备停当了,而不是从头创建,则会年夜年夜提高系统的响应速度。 对于线程的会合操持监控:将创建的线程规约在线程池里,则能够对线程的数量以及运行形态进行操持并进行监控,可对系统的线程资本进行会合管理。 线程池道理

源码API文档的描摹如下 Core and maximum pool sizes

ThreadPoolExecutor会按照corePoolSize和maximumPoolSize的界限主动的调停线程池的大小。 1)当经过execute(Runnable)提交任务时,而且正在运行的线程数少于corePoolSize,即使其余线程处于闲暇形态,也会创建一个新的线程实行这个任务;2)假若有大于corePoolSize可是小于maximumPoolSize数量的线程正在运行,则新提交的任务会放进workQueue进行任务缓存,可是如果workQueue已经满,则会间接创建线程实行,但是如果创建的线程数大于maximum pool sizes的工夫将回绝任务。 3)** 当corePoolSize和maximumPoolSize 相称时则会创建牢固命量的线程池 4)将maximumPoolSize 配置为无界限的**,比如整数的最大值,则象征着线程数和任务数量同等,也就不等待的任务 5)corePoolSize、maximumPoolSize能够按如实际需要经过布局器配置,也可以动态的在运行时设置。 On-demand construction

按照需要布局线程 1)默认环境下,每一个核心线程只要当有新任务到来时才会初始化创建,并执行 2)但是可以在运行时可以通过prestartCoreThread(一个coreThread)大约prestartAllCoreThreads(局部coreThread)来提早创建并运行指定的核心线程,这种需求适用于初始化线程池时,任务队列初始没有为空的环境下。 Creating new threads

创建线程 1)创建线程是通过ThreadFactory。除了非特此外设定,否则默认利用Executors.defaultThreadFactory作为线程池,这个线程池创建的局部线程都有雷同的线程组,线程优先级,非保卫线程的标记 2)通过使用差此内线程池,可以变动线程的名字,线程组,优先级,保卫标记等等 3)当通过newThread()挪用线程池创建线程池失利时,前往null,此时执行器会继承运行,但是大约处理惩罚不了任何任务

4)线程需要处理惩罚"modifyThread" RuntimePermission,对线程改正造走运行时权限检查。如果利用这个线程池的事变线程大概其余线程没有处理这个认证"permission"则会使服务升级:对付线程池的全部设置都不会及时的见效,一个曾经经封闭的线程池大概还会处于一种线程池停止没有实现的状态 Keep-alive times

非core线程的闲暇存活时间 1)当这个线程池此时含有过剩corePoolSize的线程存在,则过剩的线程在空闲了高出keepAliveTime的时间将会被停止 2)这供给了一种淘汰空闲线程从而低落系统线程资本消耗的方法,还可以通过setKeepAliveTime进举措态设置 3)默认情况下,keep-alive policy只对超越corePoolSize的线程起感化,但是可以通过方法allowCoreThreadTimeOut(boolean)将空闲超时计谋异样应用于coreThread,但是要保证超不断间不为0值 Queue

任何BlockingQueue均可以被用来包容和传达提交的任务 1)如果正在运行的线程小于corePoolSize,则executor会新增一个线程而不是将任务入队 2)如果正在运行的线程大于corePoolSize但是小于maximumPoolSize,executor则会将任务入队,而不是创建一个线程 3)如果任务不能入队(队列已经满),则在没有超越maximumPoolSize的情况下创建一个新的线程,否则某种计谋回绝这个任务。 three general strategies for queuing

三种入队策略 1)Direct handoffs:间接传达。比如 synchronousQueue,这个队列比力特别,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。 2)Unbounded queues:无界队列。比如 没有指定容量的LinkedBlockingQueue,这将会使coreThread不停事变,而且因为任务总能入队,所以也不会创建其他高出corePoolSize的线程。用于所有任务完整自力,不相干,比如平滑刹时高并发web页面的哀求等,实在相称于异步框架了 3)Bounded queues:有界队列。 比如ArrayBlockingQueue,有助于在设置无限大的maximumPoolSizes时,制止形成系统资源的干涸。 队列大小和最大池大小可能需要相互调和:使用大队列和小池最大限制地淘汰CPU的使用,操纵系统资源,和高低文切换开销,但可能会导致报酬的低吞吐量。如果任务常常被阻塞(比方,如果它们是I/O绑定),系统可能比你答应的时间布置更多线程的时间。使用小队列凡是是需要更大的池大小,这使患上CPU忙碌,但可能会碰到不可担当的调节开销,这也降低吞吐量。 Rejected tasks

当提交一个新任务时,如果Executor已经封闭或者者无限的workQueue,maximumPoolSizes,并且他们已经饱和了,只要呈现其中一种情况都会被拒绝。有四种已经定义的处理策略

ThreadPoolExecutor.AbortPolicy: 抛弃任务并抛出
RejectedExecutionException十分。默认
ThreadPoolExecutor.DiscardPolicy:也是抛弃任务,但是不抛出十分。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最后面的任务,而后从头实行执行任务(反复此进程)
ThreadPoolExecutor.CallerRunsPolicy:由挪用线程处理该任务

还可以继续RejectedExecutionHandler自定义完成 Hook methods 供给在每一个任务执行时差别阶段执行不同的处理函数 protected void beforeExecute(Thread t, Runnable r):优先使用指定的线程处理给定的任务,并在任务执行前做一些处理(如设置ThreadLocal变量或者者记录一些日记等),t为执行r任务的线程,r为提交的任务。 protected void afterExecute(Runnable r, Throwable t):任务执行完成时处理。r为执行完的任务,t为指定的形成任务终止的异常,如果设置为null则执行会一般完成,不会抛出异常 protected void terminated():当Executor终止时,被调用一次

以上三个方法都为空方法,使用者自行实现。在进行多层嵌套时都要表现调用 super.method() 完成下层的处理函数。如果在调用方法时产生异常,则外部的工作线程可能会顺次失利,忽然终止。

可以继承ThreadPoolExecute,并实现上述多少个Hook方法来检测线程池的状态,自定义本身的线程池,如监控任务的均匀、最大、最小执行时间,来发明有无同等阻塞的线程任务。 ThreadPoolExecutor源码详解 1-类继承结构

public class ThreadPoolExecutor extends AbstractExecutorService ThreadPoolExecutor 继承自AbstractExecutorService

明白ThreadPoolExecutor需要先明白下面的这些参数 2-线程池状态runState与workerCount

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 

使用**AtomicInteger **的CAS机制来实现对运行时状态和工作线程计数的并发一致性操纵,低29位(32-3)用来保存workerCount,所以workerCount的最大为2^29 -1 。高3位用来保存runState,多么实现具备较高服从。

线程池状态迁徙 3-其他成员

// 缓存任务阻塞队列
private final BlockingQueue workQueue;
// 线程池主锁,用于拜候worker线程集,另有其他对于线程池信息的记录信息(比如线程池大小,runState)
private final ReentrantLock mainLock = new ReentrantLock();
// 工作线程聚集,拜候时需获患上mainLock 
private final HashSet workers = new HashSet();
// mainLock上的终止前提量,用于撑持awaitTermination
private final Condition termination = mainLock.newCondition();
// 记录已经创建的最大线程数,访问需获得mainLock
private int largestPoolSize;
// 对已经完成任务进行计数,只要在工作线程终止时才会更新,访问需要获取mainLock
private long completedTaskCount;

/**
 * 如下所有变量都为volatile范例的,以便能使所有操作都基于最新值 
 * (因为这些值均可以通过对应的set方法,在运行时动态设置),
 * 但是不需要获取锁,由于所有外部一致性不依靠这些参数的同步访问来保证
 */ 
// 用于创建新线程的线程工厂
private volatile ThreadFactory threadFactory;
// 任务拒绝策略
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
// 设置默认任务拒绝策略
private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

// 对于调用线程池的shutdown(),shutdownNow()方法权限认证
 private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");

内部Worker类实现

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

轻量级独有重入锁,重要防备在执行任务期间被停止干扰。并实现了Runable接口,run方法代理到外层runwork方法主轮回上。 4-四种构造函数

  // Public constructors and methods
  // 1. 设置缓存任务队列
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue) {
        // 调用构造函数4
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

  // 2. 设置缓存任务队列,线程工厂
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory) {
        // 调用构造函数4
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

  // 3. 设置缓存任务队列,拒绝策略
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              RejectedExecutionHandler handler) {
        // 调用构造函数4
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

  // 4. 底层的构造函数,提供其他构造函数包装的底子
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize 

前三种构造方法简化了末端一种线程池设置,提供了一些默认设置 5-execute方法详解

    public void execute(Runnable co妹妹and) {
        if (co妹妹and == null) // 空任务抛出异常
            throw new NullPointerException();
       /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
       
        int c = ctl.get();
        // 1. 如果工作线程数小于核心线程数,则增加新的线程
        if (workerCountOf(c) 

对线程池运行状态进行判定,并执行相应的操纵策略 6-内部实现addWorker方法详解

    private boolean addWorker(Runnable firstTask, boolean core) {
        // 对 runState进行轮回获取和判定,如果不满意增加前提则前往false
        retry:
        for (;;) {
             // 获取runState,和workerCount
            int c = ctl.get();
            int rs = runStateOf(c);

            // 对线程池状态进行判断,能否得当添加新的线程
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            // 进行CAS设置workerCount,失败重试
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // CAS 设置成功,则跳出最外层循环
                if (compareAndIncrementWorkerCount(c)) 
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 如果workerCount没有设置成功,而且runState产生变革,
                // 则继续最外层的循环,对runState重新获取和判断
                if (runStateOf(c) != rs) 
                    continue retry;
            }
        }
       
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 添加新线程需获取全局锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                   // 获取锁后,对runState进行再次检查
                    int rs = runStateOf(ctl.get());
                    if (rs  largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start(); // 启动线程
                    workerStarted = true;
                }
            }
        } finally { // 因为中途发生异常而没有让添加的线程启动,则回滚
            if (! workerStarted)
                addWorkerFailed(w); // 线程回滚
        }
        return workerStarted;
    }

    /**
     * Rolls back the worker thread creation.
     */
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        // 对线程回滚需要获取全局锁
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w); // 从工作线程聚集中移除了添加失败的线程
            decrementWorkerCount(); // 减少工作线程计数

            // 因为停止异常而没有启动线程,从而回滚已入队的线程
            // 这其中断异常可能是关闭线程池时发生的,所以该当将终止线程池的信号传播
            tryTerminate(); // 实行终止线程
        } finally {
            mainLock.unlock();
        }
    }



在外循环对运行状态进行判断,内循环通过CAS机制对workerCount进行增加,当设置成功,则跳出外循环,否则进前进行内循环重试

外循环以后,获取全局锁,再次对运行状态进行判断,符合条件则添加新的工作线程,并启完工作线程,如果在末端对添加线程没有末尾运行(可能发生内存溢出,操作系统无法分派线程等等)则对添加操作进行回滚,移除以前添加的线程

这里提一下tryTerminate()方法,对以后情况的一种预判:执行上述操作可能是采取中断来关闭线程池的一种可能,所以得调用tryTerminate()方法传播关闭的信号,这个方法重要的成果便是中断等待在阻塞队列的Worker线程去检查能否线程池终止或者设置的变革,当线程池在终止以前或者工作线程为0时调用terminated()钩子方法,而后关照唤醒等待线程池终止的Caller线程。 7-工作线程主循环函数

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null; // 将线程初始任务设为null
        w.unlock(); // 执行任务前,释放锁,答应被中断
        boolean completedAbruptly = true; // 因为运行异常导致线程忽然终止的标志
        try {
            // 获取任务,如果没有任务可以获取,则此循环终止,
            // 这个工作线程将结束工作,等待被整理前的注销工作
            while (task != null || (task = getTask()) != null) {
                w.lock(); // 执行任务之前获取工作线程锁

                // 如果线程池关闭,则确保线程被中断
                // 如果线程池没无关闭,则确保线程不被中断
                // 这就请求在第二种情况下,进行重新检查,处理shutdownNow正在运行同时打扫中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task); // before hook
                    Throwable thrown = null;
                    try {
                        task.run(); // 运行任务

                    // 抛出异常,导致以后任务终止,工作线程加入
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown); after hook
                    }
                } finally {
                    task = null; 
                    w.completedTasks++; 
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // (可能是因为没有任务可做一般结束,或者发生异常而导致线程异常结束)
            // 注销信息,移除结束线程,然后根据情况添加新的线程等
            processWorkerExit(w, completedAbruptly);
        }
    }
    // 处理线程加入函数
   private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) //  如果是用户代码task.run()异常突然退出
            decrementWorkerCount(); // 则减少 workerCount的计数
        // 否则为正常退出,在getTask函数里已经进行了减少workerCount的操作


        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();  // 获取全局锁 
        try {
            completedTaskCount += w.completedTasks; // 登记信息
            workers.remove(w); // 移除线程
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) { // 线程池是否处于运行状态
            if (!completedAbruptly) { // 正常退出
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1; // allowCoreThreadTimeOut  情况下需要的起码线程数
                // 如果线程数量大于即是正常工作的数量则再也不添加新的线程
                if (workerCountOf(c) >= min) 
                    return; // replacement not needed
            }
            addWorker(null, false); // 添加worker线程
        }
    }


Worker工作者类继承了AQS基于独有锁实现了Worker线程的同步语义,而没有通过ThreadPoolExecute的全局重入锁来保证同步,得到了功能上的优化,让锁的粒度更大雅。Worker具备锁的语义是想让工作线程在没有正在执行任务时(特殊是当从阻塞队列获取task时被阻塞)能够被在终止线程池时肯定能感知外界的变化,所以采取了锁的语义,而且同时能保证任何内部因素不能打扰正在执行task的工作线程,除非调用中断线程的Thread.interrupt()方法能被task.run()方法内部响应,比如在run方法内检查中断标志,如果发生中断则抛出异常,则会导致工作线程退出。

通过循环,获取队列中的任务,在获取锁以后首先对线程池状态进行判断,并执行任务,在任务执行结束则释放锁,所以在空闲时间,这个锁可以被其他方法获取,从而实现对空闲线程的中断,而对正在执行任务的线程则需要通过interrupt()方法来中断(详见shutdownNow()方法)

当无法通过getTask()获取任务时,或则执行任务期间发生异常(用户代码异常),则执行processWorkerExit()方法,移除已经结束或则突然逝世亡的工作线程,然后根据情况添加新的线程等。

需要留意的是:worker线程执行任务时获取的worker内部基于AQS同步器实现的锁,而不是全局锁,所以当线程池完成预热(线程数到达corePoolSize)时,执行任务再也不需要获取全局锁,增加了任务的吞吐量,防备获取全局锁带来的性能瓶颈。 8-getTask 从缓存队列获取任务

    private Runnable getTask() {
        boolean timedOut = false; // 记录上一次获取任务是否超时的标志
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount(); // 因为上述条件无法获取任务,则减少workerCount数量
                return null; // 返回 null,导致工作线程主循环结束,并移除该工作线程
            }

            int wc = workerCountOf(c);

            // 根据线程数量判断是否工作线程需要被移除
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            // 包括对超时的判断,如果发生超时,则阐明该worker已经空闲了
            // keepAliveTime时间,则该当返回null,多么会使工作线程正常结束,
            // 并被移除
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 超时获取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 如果在keepAliveTime时间内获取就任务则返回
                if (r != null)
                    return r;
                // 否则将超时标志设置为true
                timedOut = true; 
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }


通过逝世循环来对线程池状态进行判断,并获取任务,在超时发生之前发生中断则重置超时标志位false并进行重试,如果获取就任务则返回任务
主要来看一下是怎么样实现移除空闲keepAliveTime线程的:workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法从任务队列中定时获取任务,如果超时,则阐明线程已经在等待了keepAliveTime都没有得到任务,则将超时标志设为true,鄙人一次循环时进行判断,如果发明上一次获取任务发生超时,则立即返回null,这时worker线程主循环将正常结束,并移除结束的worker。


9-shutdown和shutdownNow以及awaitTermination方法详解
 public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();  // 获取全局锁
        try {
            checkShutdownAccess(); // 调用线程权限检查
            advanceRunState(SHUTDOWN); // 将runState状态设为SHUTDOWN
            interruptIdleWorkers(); // 中断所有空闲状态的线程,通过获取Worker的锁来实现
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate(); // 尝试终止,执行terminated hook方法
    }

public List shutdownNow() {
        List tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP); //将runState状态设为SHUTDOWN
            interruptWorkers(); // 直接调用worker的中断方法,对所有worker进行中断
            tasks = drainQueue(); // 队列 --> 集合
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                // 如果所有提交的任务已经完成,则立即返回true
                if (runStateAtLeast(ctl.get(), TERMINATED)) 
                    return true;
                // 已经超时,则返回false
                if (nanos 

总结:两个方法都是通过调用worker的interrupt来中断工作线程,但是线程如果没有响应中断(task.run()方法没有感知到中断的发生,而且也没有抛出异常),则任务不可能被立刻终止。如果想要响应中断,可以在task.run()方法中计划成响应中断,然鄙人一次循环中判断中断标志来终止工作线程中。

shutdown方法将状态设置为SHUTDOWN,并拒绝新增任务,之条件交的任务如果尚未末尾则不会被执行;调用isShutdown返回true,但是调用isTerminaed返回false,关闭所有空闲的线程,正在执行的线程将会将任务执行完成。

shutdownNow方法将状态设置为STOP,并拒绝新增任务,之条件交的任务如果尚未开始则不会被执行;调用isShutdown返回true,但是调用isTerminaed返回true,关闭所有空闲的线程和正在执行的线程,所以有的任务已经开始但是可能不会完成(将损失部分任务),并将之前提交的没有开始执行的任务列表返回。

awaitTermination方法将状态设置为TERMINATED,并拒绝新增任务,调用isShutdown返回true,但是调用isTerminaed返回false,超时等待所有提交的任务的完成。 10- 方法列表总览 怎么样公道配置线程池大小?

Java线程底层映射到操作系统原生线程,而且Java在windows和linux平台下,一个Java线程映射为一个内核线程,而内核线程和CPU物理核心数同样,所以Java线程和CPU核心是一对一的关连,将线程池的工作线程设置为与物理核心相称能做到真正的线程并发,如果设置线程数多于核心则会在核心线程之间不断的切换。

日常需要根据任务的范例来配置线程池大小: 如果是CPU麋集型任务,就需要尽管压迫CPU,参考值可以设为 Num(CPU+1) 如果是IO麋集型任务,参考值可以设置为2Num(CPU)*

固然,这只是一个参考值,具体的设置还需要根据现真相况进行调停,比如可以先将线程池大小设置为参考值,再不雅察任务运行情况和系统负载、资源使用率来进行得当调整。 使用线程池的发起 发起使用有界队列,有界队列能增加系统的稳定性和预警本领,防止资源过分消耗,撑爆内存,使得系统崩溃不可用。 提交到线程池的task之间要尽管保证相互自力,不能存在相互依靠,否则可能会造成死锁等其他影响线程池执行的来由起因。 提交到的线程池的task不要又创建一个子线程执行别的任务,然后又将这个子线程任务提交到线程池,这样会造成混乱的依赖,最终导致线程池崩溃,最佳将一个task用一个线程执行。

<< 上一篇 下一篇 >>

推荐文章列表

标签列表

友情链接