redis用list做消息队列的实现示例

目录
  • 生产消息服务
  • 消费消息服务,定时任务
  • 日志
  • 测试

leftpush消息入队,rightpop对应,消息出队。

rightpop(redisconstant.mq_list, 0l, timeunit.seconds)阻塞出队,0表示永久阻塞

生产消息服务

@service
public class redisservice {
    @autowired
    private redistemplate<string, string> redistemplate;


    public object publish() {
        orderdto dto = new orderdto();
        dto.setid(1);
        dto.setcreatetime(new date());
        dto.setmoney("12.34");
        dto.setorderno("orderno1");
        string s = json.tojsonstring(dto);

        listoperations<string, string> listoperations = redistemplate.opsforlist();
        //leftpush和rightpop对应,左边入队,右边出队
        listoperations.leftpush(redisconstant.mq_list, s);

        //因为出队是阻塞读取的,所以上一步入队后,数据立刻就被驱走了,下一步size=0
        long size = listoperations.size(redisconstant.mq_list);
        list<string> list = new arraylist<>();
        if (size != null && size > 0) {
             list = listoperations.range(redisconstant.mq_list, 0, size - 1);
        }
        return list;

    }
}

测试

@restcontroller
@requestmapping("redislist")
public class redislistcontroller {

    @autowired
    private redisservice redisservice;

    @getmapping("publish")
    public object publish() {
        return redisservice.publish();
    }
}

消费消息服务,定时任务

@component
public class redisconsumetask {
    @autowired
    private redisservice redisservice;

    @tasklock(redisconstant.consume_redis_list)
    @scheduled(cron = "0/10 * * * * ?")
    public void consumemqlist() {
        redisservice.consumemqlist();
    }
}

@service
@slf4j
public class redisservice {

    @autowired
    private redistemplate<string, string> redistemplate;

    public void consumemqlist() {
        listoperations<string, string> listoperations = redistemplate.opsforlist();
        //0时间,表示阻塞永久
        //待机一小时后,再次发消息,消费不了了,阻塞有问题啊。还得轮寻啊
        //string s = listoperations.rightpop(redisconstant.mq_list, 0l, timeunit.seconds);
        string s = listoperations.rightpop(redisconstant.mq_list);
        if (s == null) {
            return;
        }

        log.info("{} = {}", redisconstant.mq_list, s);

        orderdto dto = json.parseobject(s, orderdto.class);
        log.info("dto = {}", dto);
    }
}

日志

@component
@aspect
public class tasklockaop {

    @autowired
    private redislockregistry redislockregistry;

    @around("execution(@tasklock * * (..))")
    public object taskaround(proceedingjoinpoint pjp) throws throwable {

        tasklock taskannotation = ((methodsignature)pjp.getsignature()).getmethod().getannotation(tasklock.class);

        string lockkey = taskannotation.value();
        lock lock = redislockregistry.obtain(lockkey);
        try {
            lock.trylock(30l, timeunit.seconds);
            system.out.println("任务开始, " + lockkey + ", " + new date());

            return pjp.proceed();

        } finally {
            lock.unlock();
            system.out.println("任务结束, " + lockkey + ", " + new date());
        }
    }
}

测试

http://localhost:9040/redislist/publish

[“{“createtime”:1574394538430,“id”:1,“money”:“12.34”,“orderno”:“orderno1”}”]

下面一直阻塞,任务开始了,不收到消息,永远不会结束。
阻塞有问题,改用轮询了。

先启动发送消息服务,发送消息。后启动消费消息服务,可以消费消息。这一点,比发布订阅要稳定。

关联项目https://github.com/mingwulipo/cloud-demo.git

到此这篇关于redis用list做消息队列的实现示例的文章就介绍到这了,更多相关redis list消息队列内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!

(0)
上一篇 2022年3月21日
下一篇 2022年3月21日

相关推荐