目录
- 一、前提
- 二、源码分析
- 1、redissonlock#lock() 方法
- 2、详细看下subscribe()方法
- 3、回到subscribe()方法主要逻辑还是交给了 lockpubsub#subscribe()里面
- 4、publishsubscribeservice#subscribe逻辑如下:
- 三 总结
一、前提
最近在使用分布式锁redisson时遇到一个线上问题:发现是subscriptionsperconnection or subscriptionconnectionpoolsize
的大小不够,需要提高配置才能解决。
二、源码分析
下面对其源码进行分析,才能找到到底是什么逻辑导致问题所在:
1、redissonlock#lock() 方法
private void lock(long leasetime, timeunit unit, boolean interruptibly) throws interruptedexception { long threadid = thread.currentthread().getid(); // 尝试获取,如果ttl == null,则表示获取锁成功 long ttl = tryacquire(leasetime, unit, threadid); // lock acquired if (ttl == null) { return; } // 订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题 rfuture<redissonlockentry> future = subscribe(threadid); if (interruptibly) { commandexecutor.syncsubscriptioninterrupted(future); } else { commandexecutor.syncsubscription(future); } // 后面代码忽略 try { // 无限循环获取锁,直到获取锁成功 // ... } finally { // 取消订阅锁释放事件 unsubscribe(future, threadid); } }
总结下主要逻辑:
- 获取当前线程的线程id;
- tryaquire尝试获取锁,并返回ttl
- 如果ttl为空,则结束流程;否则进入后续逻辑;
- this.subscribe(threadid)订阅当前线程,返回一个rfuture;
- 如果在指定时间没有监听到,则会产生如上异常。
- 订阅成功后, 通过while(true)循环,一直尝试获取锁
- fially代码块,会解除订阅
所以上述这情况问题应该出现在subscribe()方法中
2、详细看下subscribe()方法
protected rfuture<redissonlockentry> subscribe(long threadid) { // entryname 格式:“id:name”; // channelname 格式:“redisson_lock__channel:name”; return pubsub.subscribe(getentryname(), getchannelname()); }
redissonlock#pubsub 是在redissonlock构造函数中初始化的:
public redissonlock(commandasyncexecutor commandexecutor, string name) { // .... this.pubsub = commandexecutor.getconnectionmanager().getsubscribeservice().getlockpubsub(); }
而subscribeservice在masterslaveconnectionmanager的实现中又是通过如下方式构造的
public masterslaveconnectionmanager(masterslaveserversconfig cfg, config config, uuid id) { this(config, id); this.config = cfg; // 初始化 inittimer(cfg); initsingleentry(); } protected void inittimer(masterslaveserversconfig config) { int[] timeouts = new int[]{config.getretryinterval(), config.gettimeout()}; arrays.sort(timeouts); int mintimeout = timeouts[0]; if (mintimeout % 100 != 0) { mintimeout = (mintimeout % 100) / 2; } else if (mintimeout == 100) { mintimeout = 50; } else { mintimeout = 100; } timer = new hashedwheeltimer(new defaultthreadfactory("redisson-timer"), mintimeout, timeunit.milliseconds, 1024, false); connectionwatcher = new idleconnectionwatcher(this, config); // 初始化:其中this就是masterslaveconnectionmanager实例,config则为masterslaveserversconfig实例: subscribeservice = new publishsubscribeservice(this, config); }
publishsubscribeservice构造函数
private final semaphorepubsub semaphorepubsub = new semaphorepubsub(this); public publishsubscribeservice(connectionmanager connectionmanager, masterslaveserversconfig config) { super(); this.connectionmanager = connectionmanager; this.config = config; for (int i = 0; i < locks.length; i++) { // 这里初始化了一组信号量,每个信号量的初始值为1 locks[i] = new asyncsemaphore(1); } }
3、回到subscribe()方法主要逻辑还是交给了 lockpubsub#subscribe()里面
private final concurrentmap<string, e> entries = new concurrenthashmap<>(); public rfuture<e> subscribe(string entryname, string channelname) { // 从publishsubscribeservice获取对应的信号量。 相同的channelname获取的是同一个信号量 // public asyncsemaphore getsemaphore(channelname channelname) { // return locks[math.abs(channelname.hashcode() % locks.length)]; // } asyncsemaphore semaphore = service.getsemaphore(new channelname(channelname)); atomicreference<runnable> listenerholder = new atomicreference<runnable>(); rpromise<e> newpromise = new redissonpromise<e>() { @override public boolean cancel(boolean mayinterruptifrunning) { return semaphore.remove(listenerholder.get()); } }; runnable listener = new runnable() { @override public void run() { // 如果存在redissonlockentry, 则直接利用已有的监听 e entry = entries.get(entryname); if (entry != null) { entry.acquire(); semaphore.release(); entry.getpromise().oncomplete(new transferlistener<e>(newpromise)); return; } e value = createentry(newpromise); value.acquire(); e oldvalue = entries.putifabsent(entryname, value); if (oldvalue != null) { oldvalue.acquire(); semaphore.release(); oldvalue.getpromise().oncomplete(new transferlistener<e>(newpromise)); return; } // 创建监听, redispubsublistener<object> listener = createlistener(channelname, value); // 订阅监听 service.subscribe(longcodec.instance, channelname, semaphore, listener); } }; // 最终会执行listener.run方法 semaphore.acquire(listener); listenerholder.set(listener); return newpromise; }
asyncsemaphore#acquire()方法
public void acquire(runnable listener) { acquire(listener, 1); } public void acquire(runnable listener, int permits) { boolean run = false; synchronized (this) { // counter初始化值为1 if (counter < permits) { // 如果不是第一次执行,则将listener加入到listeners集合中 listeners.add(new entry(listener, permits)); return; } else { counter -= permits; run = true; } } // 第一次执行acquire, 才会执行listener.run()方法 if (run) { listener.run(); } }
梳理上述逻辑:
1、从publishsubscribeservice获取对应的信号量, 相同的channelname获取的是同一个信号量
2、如果是第一次请求,则会立马执行listener.run()方法, 否则需要等上个线程获取到该信号量执行完方能执行;
3、如果已经存在redissonlockentry, 则利用已经订阅就行
4、如果不存在redissonlockentry, 则会创建新的redissonlockentry,然后进行。
从上面代码看,主要逻辑是交给了publishsubscribeservice#subscribe方法
4、publishsubscribeservice#subscribe逻辑如下:
private final concurrentmap<channelname, pubsubconnectionentry> name2pubsubconnection = new concurrenthashmap<>(); private final queue<pubsubconnectionentry> freepubsubconnections = new concurrentlinkedqueue<>(); public rfuture<pubsubconnectionentry> subscribe(codec codec, string channelname, asyncsemaphore semaphore, redispubsublistener<?>... listeners) { rpromise<pubsubconnectionentry> promise = new redissonpromise<pubsubconnectionentry>(); // 主要逻辑入口, 这里要主要channelname每次都是新对象, 但内部覆写hashcode+equals。 subscribe(codec, new channelname(channelname), promise, pubsubtype.subscribe, semaphore, listeners); return promise; } private void subscribe(codec codec, channelname channelname, rpromise<pubsubconnectionentry> promise, pubsubtype type, asyncsemaphore lock, redispubsublistener<?>... listeners) { pubsubconnectionentry connentry = name2pubsubconnection.get(channelname); if (connentry != null) { // 从已有connection中取,如果存在直接把listeners加入到pubsubconnectionentry中 addlisteners(channelname, promise, type, lock, connentry, listeners); return; } // 没有时,才是最重要的逻辑 freepubsublock.acquire(new runnable() { @override public void run() { if (promise.isdone()) { lock.release(); freepubsublock.release(); return; } // 从队列中取头部元素 pubsubconnectionentry freeentry = freepubsubconnections.peek(); if (freeentry == null) { // 第一次肯定是没有的需要建立 connect(codec, channelname, promise, type, lock, listeners); return; } // 如果存在则尝试获取,如果remainfreeamount小于0则抛出异常终止了。 int remainfreeamount = freeentry.tryacquire(); if (remainfreeamount == -1) { throw new illegalstateexception(); } pubsubconnectionentry oldentry = name2pubsubconnection.putifabsent(channelname, freeentry); if (oldentry != null) { freeentry.release(); freepubsublock.release(); addlisteners(channelname, promise, type, lock, oldentry, listeners); return; } // 如果remainfreeamount=0, 则从队列中移除 if (remainfreeamount == 0) { freepubsubconnections.poll(); } freepubsublock.release(); // 增加监听 rfuture<void> subscribefuture = addlisteners(channelname, promise, type, lock, freeentry, listeners); channelfuture future; if (pubsubtype.psubscribe == type) { future = freeentry.psubscribe(codec, channelname); } else { future = freeentry.subscribe(codec, channelname); } future.addlistener(new channelfuturelistener() { @override public void operationcomplete(channelfuture future) throws exception { if (!future.issuccess()) { if (!promise.isdone()) { subscribefuture.cancel(false); } return; } connectionmanager.newtimeout(new timertask() { @override public void run(timeout timeout) throws exception { subscribefuture.cancel(false); } }, config.gettimeout(), timeunit.milliseconds); } }); } }); } private void connect(codec codec, channelname channelname, rpromise<pubsubconnectionentry> promise, pubsubtype type, asyncsemaphore lock, redispubsublistener<?>... listeners) { // 根据channelname计算出slot获取pubsubconnection int slot = connectionmanager.calcslot(channelname.getname()); rfuture<redispubsubconnection> connfuture = nextpubsubconnection(slot); promise.oncomplete((res, e) -> { if (e != null) { ((rpromise<redispubsubconnection>) connfuture).tryfailure(e); } }); connfuture.oncomplete((conn, e) -> { if (e != null) { freepubsublock.release(); lock.release(); promise.tryfailure(e); return; } // 这里会从配置中读取subscriptionsperconnection pubsubconnectionentry entry = new pubsubconnectionentry(conn, config.getsubscriptionsperconnection()); // 每获取一次,subscriptionsperconnection就会减直到为0 int remainfreeamount = entry.tryacquire(); // 如果旧的存在,则将现有的entry释放,然后将listeners加入到oldentry中 pubsubconnectionentry oldentry = name2pubsubconnection.putifabsent(channelname, entry); if (oldentry != null) { releasesubscribeconnection(slot, entry); freepubsublock.release(); addlisteners(channelname, promise, type, lock, oldentry, listeners); return; } if (remainfreeamount > 0) { // 加入到队列中 freepubsubconnections.add(entry); } freepubsublock.release(); rfuture<void> subscribefuture = addlisteners(channelname, promise, type, lock, entry, listeners); // 这里真正的进行订阅(底层与redis交互) channelfuture future; if (pubsubtype.psubscribe == type) { future = entry.psubscribe(codec, channelname); } else { future = entry.subscribe(codec, channelname); } future.addlistener(new channelfuturelistener() { @override public void operationcomplete(channelfuture future) throws exception { if (!future.issuccess()) { if (!promise.isdone()) { subscribefuture.cancel(false); } return; } connectionmanager.newtimeout(new timertask() { @override public void run(timeout timeout) throws exception { subscribefuture.cancel(false); } }, config.gettimeout(), timeunit.milliseconds); } }); }); }
pubsubconnectionentry#tryacquire方法, subscriptionsperconnection代表了每个连接的最大订阅数。当tryacqcurie的时候会减少这个数量:
public int tryacquire() { while (true) { int value = subscribedchannelsamount.get(); if (value == 0) { return -1; } if (subscribedchannelsamount.compareandset(value, value - 1)) { return value - 1; } } }
梳理上述逻辑:
1、还是进行重复判断, 根据channelname从name2pubsubconnection中获取,看是否存在已经订阅:pubsubconnectionentry; 如果存在直接把新的listener加入到pubsubconnectionentry。
2、从队列freepubsubconnections中取公用的pubsubconnectionentry, 如果没有就进入connect()方法
2.1 会根据subscriptionsperconnection创建pubsubconnectionentry, 然后调用其tryacquire()方法 – 每调用一次就会减1
2.2 将新的pubsubconnectionentry放入全局的name2pubsubconnection, 方便后续重复使用;
2.3 同时也将pubsubconnectionentry放入队列freepubsubconnections中。- remainfreeamount > 0
2.4 后面就是进行底层的subscribe和addlistener
3、如果已经存在pubsubconnectionentry,则利用已有的pubsubconnectionentry进行tryacquire;
4、如果remainfreeamount < 0 会抛出illegalstateexception异常;如果remainfreeamount=0,则会将其从队列中移除, 那么后续请求会重新获取一个可用的连接
5、最后也是进行底层的subscribe和addlistener;
三 总结
根因: 从上面代码分析, 导致问题的根因是因为publishsubscribeservice 会使用公共队列中的freepubsubconnections, 如果同一个key一次性请求超过subscriptionsperconnection它的默认值5时,remainfreeamount就可能出现-1的情况, 那么就会导致commandexecutor.syncsubscription(future)中等待超时,也就抛出如上异常subscribe timeout: (7500ms). increase ‘subscriptionsperconnection’ and/or ‘subscriptionconnectionpoolsize’ parameters.
解决方法: 在初始化redisson可以可指定这个配置项的值。
相关参数的解释以及默认值请参考官网:https://github.com/redisson/redisson/wiki/2.-configuration#23-common-settings
到此这篇关于关于使用redisson订阅数问题的文章就介绍到这了,更多相关redisson 订阅数 内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!