目录
- 生产消息服务
- 消费消息服务,定时任务
- 日志
- 测试
leftpush消息入队,rightpop对应,消息出队。
rightpop(redisconstant.mq_list, 0l, timeunit.seconds)阻塞出队,0表示永久阻塞
生产消息服务
@service public class redisservice { @autowired private redistemplate<string, string> redistemplate; public object publish() { orderdto dto = new orderdto(); dto.setid(1); dto.setcreatetime(new date()); dto.setmoney("12.34"); dto.setorderno("orderno1"); string s = json.tojsonstring(dto); listoperations<string, string> listoperations = redistemplate.opsforlist(); //leftpush和rightpop对应,左边入队,右边出队 listoperations.leftpush(redisconstant.mq_list, s); //因为出队是阻塞读取的,所以上一步入队后,数据立刻就被驱走了,下一步size=0 long size = listoperations.size(redisconstant.mq_list); list<string> list = new arraylist<>(); if (size != null && size > 0) { list = listoperations.range(redisconstant.mq_list, 0, size - 1); } return list; } }
测试
@restcontroller @requestmapping("redislist") public class redislistcontroller { @autowired private redisservice redisservice; @getmapping("publish") public object publish() { return redisservice.publish(); } }
消费消息服务,定时任务
@component public class redisconsumetask { @autowired private redisservice redisservice; @tasklock(redisconstant.consume_redis_list) @scheduled(cron = "0/10 * * * * ?") public void consumemqlist() { redisservice.consumemqlist(); } } @service @slf4j public class redisservice { @autowired private redistemplate<string, string> redistemplate; public void consumemqlist() { listoperations<string, string> listoperations = redistemplate.opsforlist(); //0时间,表示阻塞永久 //待机一小时后,再次发消息,消费不了了,阻塞有问题啊。还得轮寻啊 //string s = listoperations.rightpop(redisconstant.mq_list, 0l, timeunit.seconds); string s = listoperations.rightpop(redisconstant.mq_list); if (s == null) { return; } log.info("{} = {}", redisconstant.mq_list, s); orderdto dto = json.parseobject(s, orderdto.class); log.info("dto = {}", dto); } }
日志
@component @aspect public class tasklockaop { @autowired private redislockregistry redislockregistry; @around("execution(@tasklock * * (..))") public object taskaround(proceedingjoinpoint pjp) throws throwable { tasklock taskannotation = ((methodsignature)pjp.getsignature()).getmethod().getannotation(tasklock.class); string lockkey = taskannotation.value(); lock lock = redislockregistry.obtain(lockkey); try { lock.trylock(30l, timeunit.seconds); system.out.println("任务开始, " + lockkey + ", " + new date()); return pjp.proceed(); } finally { lock.unlock(); system.out.println("任务结束, " + lockkey + ", " + new date()); } } }
测试
http://localhost:9040/redislist/publish
[“{“createtime”:1574394538430,“id”:1,“money”:“12.34”,“orderno”:“orderno1”}”]
下面一直阻塞,任务开始了,不收到消息,永远不会结束。
阻塞有问题,改用轮询了。
先启动发送消息服务,发送消息。后启动消费消息服务,可以消费消息。这一点,比发布订阅要稳定。
关联项目https://github.com/mingwulipo/cloud-demo.git
到此这篇关于redis用list做消息队列的实现示例的文章就介绍到这了,更多相关redis list消息队列内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!