博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java 使用Queue在队列中异步执行任务
阅读量:4314 次
发布时间:2019-06-06

本文共 5028 字,大约阅读时间需要 16 分钟。

先创建一个总的Handler(队列统一处理接口),名字就叫做 QueueTaskHandler

public interface QueueTaskHandler {    void processData();}

然后写一个队列服务类,就不多做说明了,我的注释已经写的很清楚了

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.LinkedBlockingQueue;@Componentpublic class QueueGenerationService{    // 日志监控    private static final Logger log = LoggerFactory.getLogger(QueueGenerationService.class);    // 根据业务与服务器性能自行配置 这里我配置的是最多50000个任务    // LinkedBlockingQueue构造的时候若没有指定大小,则默认大小为Integer.MAX_VALUE    private final LinkedBlockingQueue
tasks = new LinkedBlockingQueue
(50000); // 类似于一个线程总管 保证所有的任务都在队列之中 private ExecutorService service = Executors.newSingleThreadExecutor(); // 检查服务是否运行 private volatile boolean running = true; //线程状态 private Future
serviceThreadStatus = null; @PostConstruct public void init() { serviceThreadStatus = service.submit(new Thread(new Runnable() { @Override public void run() { while (running) { try { //开始一个任务 QueueTaskHandler task = tasks.take(); try { task.processData(); } catch (Exception e) { log.error("任务处理发生错误", e); } } catch (InterruptedException e) { log.error("服务停止,退出", e); running = false; } } } }, "save data thread")); } public boolean addData(QueueTaskHandler dataHandler) { if (!running) { log.warn("service is stop"); return false; } //offer 队列已经满了,无法再加入的情况下 boolean success = tasks.offer(dataHandler); if (!success) { log.warn("添加任务到队列失败"); } return success; }   //判断队列是否有任务     public boolean isEmpty() {
      return tasks.isEmpty();     } public boolean checkServiceRun() { return running && !service.isShutdown() && !serviceThreadStatus.isDone(); } public void activeService() { running = true; if (service.isShutdown()) { service = Executors.newSingleThreadExecutor(); init(); log.info("线程池关闭,重新初始化线程池及任务"); } if (serviceThreadStatus.isDone()) { init(); log.info("线程池任务结束,重新初始化任务"); } } @PreDestroy public void destory() { running = false; service.shutdownNow(); }}

接下来就可以开始写你的业务Handler了

public class TestServiceHandler implements QueueTaskHandler {    // ******* start 这一段并不是必要的,这是示范一个传值的方式    private String name;    private Integer age;    public TestServiceHandler(String name) {    this.name = name;    }    public TestServiceHandler(Integer age) {    this.age = age;    }    public TestServiceHandler(String name, Integer age) {    this.name = name;    this.age = age;    }    // ****** end    // 这里也就是我们实现QueueTaskHandler的处理接口    @Override    public void processData() {    // 可以去做你想做的业务了    // 这里需要引用spring的service的话,我写了一个工具类,下面会贴出来    // ItestService testService = SpringUtils.getBean(ItestService.class);    System.out.println("name > " + name + "," + "age > " + age);    }}

那么我们来在service中添加一个任务

// 这里注入队列服务     @Autowired    private QueueGenerationService queueGenerationService;
  // 在方法中调用与传参的方式   queueGenerationService.addData(new TestServiceHandler("小明",5));
 

整个过程就结束了,然后在你的业务Handler中如果需要使用其他的bean比如service,那么请试试我写的这个工具类

import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.stereotype.Component;@Componentpublic class SpringUtils implements ApplicationContextAware {    private static ApplicationContext applicationContext;    /**     * @return     * @Description 获取applicationContext     */    public static ApplicationContext getApplicationContext() {        return applicationContext;    }    @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        if (SpringUtils.applicationContext == null) {            SpringUtils.applicationContext = applicationContext;        }    }    /**     * @param name     * @return     * @Description 通过name获取 Bean.     */    public static Object getBean(String name) {        return getApplicationContext().getBean(name);    }    /**     * @param clazz     * @return     * @Description 通过class获取Bean.     */    public static 
T getBean(Class
clazz) { return getApplicationContext().getBean(clazz); } /** * @param name * @param clazz * @return * @Description 通过name, 以及Clazz返回指定的Bean */ public static
T getBean(String name, Class
clazz) { return getApplicationContext().getBean(name, clazz); }}

 如果大家有什么不解,或意见,欢迎在下方留言,楼主看到就会回复的,谢谢。

转载于:https://www.cnblogs.com/-renyu/p/10594257.html

你可能感兴趣的文章
VS无法启动调试:“生成下面的模块时,启用了优化或没有调试信息“
查看>>
npm 安装 sass=-=-=
查看>>
WINFORM中加入WPF控件并绑定数据源实现跨线程自动更新
查看>>
C#类对象的事件定义
查看>>
各类程序员学习路线图
查看>>
HDU 5510 Bazinga KMP
查看>>
关于select @@IDENTITY的初识
查看>>
ASP.NET MVC ajax提交 防止CSRF攻击
查看>>
关于CSS伪类选择器
查看>>
适用于带文字 和图片的垂直居中方法
查看>>
Part 2 - Fundamentals(4-10)
查看>>
使用Postmark测试后端存储性能
查看>>
NSTextView 文字链接的定制化
查看>>
第五天站立会议内容
查看>>
CentOs7安装rabbitmq
查看>>
(转))iOS App上架AppStore 会遇到的坑
查看>>
解决vmware与主机无法连通的问题
查看>>
做好产品
查看>>
项目管理经验
查看>>
笔记:Hadoop权威指南 第8章 MapReduce 的特性
查看>>