说明
这里将自己定义实现一个消息队列,该消息队列可以手动配置队列容量,可以供生产者生产消息,可以供消费者消费消息。
1、自定义消息队列
消息类
@DatapublicclassMessage{// 消息IDprivatefinalint id;// 消息内容privatefinalObject message;publicMessage(finalint id,finalObject message){this.id= id;this.message= message;}}
消息队列类
根据面向对象的思想,消息队列需要提供put消息和take消息的方法。
@Slf4jpublicclassMessageQuene{/**
* 双向队列
*/privatefinalLinkedList<Message> queue=newLinkedList<>();/**
* 消息队列的容量,默认容量110
*/privateint capacity=10;publicMessageQuene(finalint capacity){this.capacity= capacity;}/**
* 向消息队列put消息
*/publicvoidput(finalMessage message)throwsInterruptedException{synchronized(this.queue){// 超出容量,则在这里等待消费者取走消息while(this.queue.size()>=this.capacity){
log.info("消息队列已满,等待消费者消费消息.....");this.queue.wait();}// 从尾部加,从头部取this.queue.addLast(message);this.queue.notifyAll();}}/**
* 从消息队列获取消息
*/publicMessagetake()throwsInterruptedException{synchronized(this.queue){while(this.queue.isEmpty()){
log.info("消息队列已空,等待生产者生产消息.....");// 这里将中断异常抛出this.queue.wait();}// 从尾部加,从头部取,取出后就从队列删除finalMessage message=this.queue.pollFirst();this.queue.notifyAll();return message;}}}
2、自定义消息队列测试
@Slf4jpublicclassMessageQueneTest{staticfinalMessageQuene messageQuene=newMessageQuene(2);publicstaticvoidmain(finalString[] args){// 消费者不断从队列中消费消息finalThread t1=newThread(MessageQueneTest::consumeMessage,"t1");finalThread t2=newThread(MessageQueneTest::consumeMessage,"t2");// 生产者,不断向队列中生产消息finalThread t3=newThread(MessageQueneTest::putMessage,"t3");
t1.start();
t2.start();
t3.start();}publicstaticvoidconsumeMessage(){try{while(true){finalMessage message= messageQuene.take();
log.info("消费者消费到消息,id = {},message = {}", message.getId(), message.getMessage());}}catch(finalInterruptedException e){
e.printStackTrace();}}publicstaticvoidputMessage(){try{while(true){finalString message= UUID.randomUUID().toString();finalint id=RandomUtils.nextInt();
log.info("生产者生产消息,id = {},message = {}", id, message);
messageQuene.put(newMessage(id, message));TimeUnit.SECONDS.sleep(1);}}catch(finalInterruptedException e){
e.printStackTrace();}}}
测试结果
11:27:58.787 [t1] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息.....
11:27:58.794 [t2] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息.....
11:27:58.802 [t3] INFO com.wp.juc.hm.test.MessageQueneTest - 生产者生产消息,id = 872383243,message = 0b06d297-a59a-4a47-9f78-ddd36eb332fc
11:27:58.805 [t1] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息.....
11:27:58.805 [t2] INFO com.wp.juc.hm.test.MessageQueneTest - 消费者消费到消息,id = 872383243,message = 0b06d297-a59a-4a47-9f78-ddd36eb332fc
11:27:58.805 [t2] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息.....
11:27:59.809 [t3] INFO com.wp.juc.hm.test.MessageQueneTest - 生产者生产消息,id = 773935934,message = 0fe599b8-bf43-41e7-9158-96a6d12b8a25