Code Bye

关于Executors.newScheduledThreadPool(1);批处理设计

public abstract class AbstractBatchJobProcessor implements JobProcessor
{
    private final static  ExecutorService executors = Executors.newCachedThreadPool();

    private ScheduledExecutorService schedulerExecutors = Executors.newScheduledThreadPool(1);

    private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
    
    /**
     * 处理包大小
     * 
     * <p>
     * TODO 方法功能描述
     * 
     * @return
     * @return int
     */
    abstract public int getPackageSize();

    /**
     * 处理周期(单位:分)
     * 
     * <p>
     * TODO 方法功能描述
     * 
     * @return
     * @return int
     */
    abstract public int getCycle();
    
    /**
     * 获取任务列表
     * 
     * <p>TODO 方法功能描述
     * 
     * @return
     * @return List<Runnable>
     */
    abstract public List<Runnable> getTaskList();
    
    
    // 每天凌晨0点触发
    @Override
    public void process(ProcessContext context)
    {
        
        List<Runnable> taskList = getTaskList();
        
        if(taskList == null || taskList.size() == 0)
        {
            return;
        }
            
        for(Runnable task : taskList)
        {
            addTask(task);
        }
        
        start();
    }

    public void start()
    {
        schedulerExecutors.scheduleAtFixedRate(new TaskWorker(), 1, getCycle(), TimeUnit.MINUTES);
    }

    private class TaskWorker implements Runnable
    {

        @Override
        public void run()
        {
            int i = 0;
            while (!queue.isEmpty() && i++ < getPackageSize())
            {

                Runnable nextTask = nextTask();

                if (nextTask != null)
                {
                    executors.execute(nextTask);
                }
            }
            
        }
    }

    public Runnable nextTask()
    {
        try
        {
            return queue.take();
        }
        catch (InterruptedException e)
        {
            return null;
        }
    }

    public void addTask(Runnable task)
    {
        try
        {
            queue.put(task);
        }
        catch (InterruptedException e)
        {

        }
    }
}

该类的意思是使用Quartz每天凌晨0点触发,从数据库取出2000个数据,这样子类实现的方法getTaskList();就有2000个任务,然后调用start()方法。

每隔getCycle()分钟执行getPackageSize();个线程。

请问 schedulerExecutors.scheduleAtFixedRate(new TaskWorker(), 1, getCycle(), TimeUnit.MINUTES);这个方法是不是只调用一次就行了。
为什么过两三天机子CPU就占用了300%。对多线程这方法没有什么经验啊


20分
 private final static  ExecutorService executors = Executors.newCachedThreadPool();

这里的池子没有控制大小,每天0点任务调度的时候你把executors池子的线程数量打印出来看看。

还有你这里的定时任务只有一个,没有必要使用schedulerExecutors来做定时。可以采用Timer来做。

引用 1 楼 littlebrain4solving 的回复:

 private final static  ExecutorService executors = Executors.newCachedThreadPool();

这里的池子没有控制大小,每天0点任务调度的时候你把executors池子的线程数量打印出来看看。

还有你这里的定时任务只有一个,没有必要使用schedulerExecutors来做定时。可以采用Timer来做。

  那我怎么保证下一次0点调度的时候,之前的Timer是否还在运行呢?如果是的话?怎么重复利用呢?

那么你判断下。/
引用 2 楼 k10509806 的回复:
Quote: 引用 1 楼 littlebrain4solving 的回复:

 private final static  ExecutorService executors = Executors.newCachedThreadPool();

这里的池子没有控制大小,每天0点任务调度的时候你把executors池子的线程数量打印出来看看。

还有你这里的定时任务只有一个,没有必要使用schedulerExecutors来做定时。可以采用Timer来做。

  那我怎么保证下一次0点调度的时候,之前的Timer是否还在运行呢?如果是的话?怎么重复利用呢?

你为什么要知道上一次的Timer还在运行?难道任务执行12个小时还不够处理的吗?这种情况都会做超时处理!

你要知道一点,一个Timer下面可以调度N个TimerTask;你当然可以获取到Timer下面任务的数量。

需要知道的是Timer是单线程对定时处理!

1.process里不能再调用start了,start应该在系统启动的时候配置Listener调用一次就OK了。多次调用会导致存在多个定时任务。
2.没有看到你设置ExecutorService的核心进程与最大、最小进程数的限制,当大量任务存储进程池时可能会创建超出系统承受范围的进程数量。
3.你的定时器的用法有问题,根据你写的业务来看根本就没有用。你的taskQueue是每天0点时候才有的数据,而taskWorker每分钟执行的时候直接清空任务队列,这样推测一天你的定时器仅有一次触发是有效触发。 正确设计应该在addTask方法时直接executors.execute(nextTask);。取消定时器功能。
总结:
1.设置线程池的初始线程数、最小线程数、最大线程数。
2.取消掉start方法,取消该类中对定时器的使用。
3.检查确定每个task的执行时间与系统计算资源情况并调整线程池的相关参数。
4.你自己的queue变量在这么改后也没有用了。
引用 6 楼 zyb134506 的回复:

1.process里不能再调用start了,start应该在系统启动的时候配置Listener调用一次就OK了。多次调用会导致存在多个定时任务。
2.没有看到你设置ExecutorService的核心进程与最大、最小进程数的限制,当大量任务存储进程池时可能会创建超出系统承受范围的进程数量。
3.你的定时器的用法有问题,根据你写的业务来看根本就没有用。你的taskQueue是每天0点时候才有的数据,而taskWorker每分钟执行的时候直接清空任务队列,这样推测一天你的定时器仅有一次触发是有效触发。 正确设计应该在addTask方法时直接executors.execute(nextTask);。取消定时器功能。
总结:
1.设置线程池的初始线程数、最小线程数、最大线程数。
2.取消掉start方法,取消该类中对定时器的使用。
3.检查确定每个task的执行时间与系统计算资源情况并调整线程池的相关参数。
4.你自己的queue变量在这么改后也没有用了。

非常感谢你的详细分析。请再帮我解答下心中的疑惑,非常感谢
如果我取消了定时器的使用,一次过来了2000个任务是不是由线程池来负责任务调度,会不会因为阻塞过多出问题。


80分
引用 7 楼 k10509806 的回复:
Quote: 引用 6 楼 zyb134506 的回复:

1.process里不能再调用start了,start应该在系统启动的时候配置Listener调用一次就OK了。多次调用会导致存在多个定时任务。
2.没有看到你设置ExecutorService的核心进程与最大、最小进程数的限制,当大量任务存储进程池时可能会创建超出系统承受范围的进程数量。
3.你的定时器的用法有问题,根据你写的业务来看根本就没有用。你的taskQueue是每天0点时候才有的数据,而taskWorker每分钟执行的时候直接清空任务队列,这样推测一天你的定时器仅有一次触发是有效触发。 正确设计应该在addTask方法时直接executors.execute(nextTask);。取消定时器功能。
总结:
1.设置线程池的初始线程数、最小线程数、最大线程数。
2.取消掉start方法,取消该类中对定时器的使用。
3.检查确定每个task的执行时间与系统计算资源情况并调整线程池的相关参数。
4.你自己的queue变量在这么改后也没有用了。

非常感谢你的详细分析。请再帮我解答下心中的疑惑,非常感谢
如果我取消了定时器的使用,一次过来了2000个任务是不是由线程池来负责任务调度,会不会因为阻塞过多出问题。

把定时器里添加任务的代码放到addTask中的话就是由线程池来调度运行。至于阻塞问题,如果你的task是线程安全的,且不需要和其他Task竞争的话是不会有问题的,线程池会按顺序来排列Task的运行。


CodeBye 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权 , 转载请注明关于Executors.newScheduledThreadPool(1);批处理设计