大纲
Spring Boot Web项目(含源码)ConcurrentLinkedQueue+ReentrantLock+Condition实现消费者功能(类似MQ队列)
业务场景
前台注册时,给用户的邮箱异步发送邮件用于激活账户.服务器资源紧张不能引入MQ队列来实现,所以用JUC写了个消费者功能废话不多说直接上代码
项目结构
代码
消费者核心代码
/**
* 描述: 消费者线程
* date: 2020/6/22 0022
**/
@Slf4j
public class ConsumerTask implements Runnable {
public void run() {
doConsumer();
}
private void doConsumer() {
ConsumerTaskHolder.lock.lock();
try{
while (true){
while (ConsumerTaskHolder.queue.size() == 0){
log.info("队列中无数据等待数据中......");
ConsumerTaskHolder.notEmpty.await();
}
//消费邮箱
String email = ConsumerTaskHolder.queue.poll();
log.info("收到邮箱:{},发送邮件......",email);
// 处理逻辑 发送邮件..... some action
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
ConsumerTaskHolder.lock.unlock();
}
}
}
生产者代码
/**
* 描述: 提供者主要用于给队列推送数据
* date: 2020/6/22 0022
**/
public class ProviderTask {
/**
* 给队列推送数据
* @param email
*/
public static void pushQueue(String email){
ConsumerTaskHolder.lock.lock();
try {
ConsumerTaskHolder.queue.add(email);
//唤醒消费者线程处理任务啦
ConsumerTaskHolder.notEmpty.signalAll();
}finally {
ConsumerTaskHolder.lock.unlock();
}
}
}
生产者和消费者共同使用的队列等资源代码
/**
* 描述: 消费者相关资源
* date: 2020/6/22 0022
**/
public class ConsumerTaskHolder {
public static Lock lock = new ReentrantLock();
public static Condition notEmpty = lock.newCondition();
/**
* 存放数据的队列
*/
public static ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue();
}
项目启动加载线程池代码
/**
* 描述: 线程池启动加载
* date: 2020/6/22 0022
**/
@Slf4j
@Component
@Order(0)
public class ConsumerThreadPool implements ApplicationRunner {
/**
* 线程池
*/
public static final ExecutorService executorService = Executors.newFixedThreadPool(5);
public void run(ApplicationArguments args) {
log.info("加载线程池中.........");
try {
ConsumerTask consumerTask = new ConsumerTask();
executorService.submit(consumerTask);
log.info("线程池启动完成......");
} catch (Exception e) {
log.error("线程池启动报错了.......");
e.printStackTrace();
}
}
/**
* 项目销毁前执行
*/
@PreDestroy
public void destroy() {
try {
executorService.shutdown();//优雅的关闭
log.info("关闭线程池....");
// List<Runnable> runnables = executorService.shutdownNow();//直接关闭
// log.debug("未执行完的任务,"+runnables);
} catch (Exception e) {
log.error("线程池关闭失败...", e);
e.printStackTrace();
}
}
}
项目启动后如图
Controller和Service代码
@RequestMapping("/system")
@Controller
public class SystemController {
@Autowired
UserService userService;
@GetMapping("/{path}")
public String index(@PathVariable String path){
return path;
}
@ResponseBody
@PostMapping("/register")
public Map register(User user){
Map resultMap = new HashMap();
userService.register(user);
return resultMap;
}
}
@Slf4j
@Service
public class UserServiceImpl implements UserService {
public void register(User user) {
String email = user.getEmail();
//给队列推送邮箱账号过去,队列处理发送邮件
ProviderTask.pushQueue(email);
//some action
log.info("保存用户信息到数据库");
}
}
前台页面推送数据
控制台效果如图
队列正常消费接收到邮箱号就处理发送邮件.
总结
项目启动后消费者就会等待数据,生产者推送一个数据过去后唤醒消费者,消费者线程接收到唤醒信号去消费数据.反复如此,由于是Demo所以代码比较简单,实际业务中的问题自行完善即可.