Redis Stream类型的使用详解

目录
  • 一、背景
  • 二、redis中stream类型的特点
  • 三、stream的结构
  • 四、stream的命令
    • 1、xadd 往stream末尾添加消息
      • 1、命令格式
      • 2、举例
    • 2、xrange查看stream中的消息
      • 1、命令格式
      • 2、准备数据
      • 3、举例
    • 3、xrevrange反向查看stream中的消息
      • 4、xdel删除消息
        • 1、命令格式
        • 2、准备数据
        • 3、举例
      • 5、xlen查看stream中元素的长度
        • 1、命令格式
        • 2、举例
      • 6、xtrim对stream中的元素进行修剪
        • 1、命令格式
        • 2、准备数据
        • 3、举例
      • 7、xread独立消费消息
        • 1、命令格式
        • 2、准备数据
        • 3、举例
      • 8、消费者组相关操作
        • 1、消费者组命令
        • 2、准备数据
        • 3、创建消费者组
        • 4、创建一个从某个消息之后消费的消费者组
        • 6、一些监控命令
    • 五、参考文档

      一、背景

      最近在看redis这方面的知识,发现在redis5中产生了一种新的数据类型stream,它和kafka的设计有些类似,可以当作一个简单的消息队列来使用。

      二、redis中stream类型的特点

      • 是可持久化的,可以保证数据不丢失。
      • 支持消息的多播、分组消费。
      • 支持消息的有序性。

      三、stream的结构

      解释:

      消费者组: consumer group,即使用 xgroup create 命令创建的,一个消费者组中可以存在多个消费者,这些消费者之间是竞争关系。

      • 同一条消息,只能被这个消费者组中的某个消费者获取。
      • 多个消费者之间是相互独立的,互不干扰。

      消费者: consumer 消费消息。

      last_delivered_id: 这个id保证了在同一个消费者组中,一个消息只能被一个消费者获取。每当消费者组的某个消费者读取到了这个消息后,这个last_delivered_id的值会往后移动一位,保证消费者不会读取到重复的消息。

      pending_ids:记录了消费者读取到的消息id列表,但是这些消息可能还没有处理,如果认为某个消息处理,需要调用ack命令。这样就确保了某个消息一定会被执行一次。

      消息内容:是一个键值对的格式。

      stream 中 消息的 id: 默认情况下,id使用 * ,redis可以自动生成一个,格式为 时间戳-序列号,也可以自己指定,一般使用默认生成的即可,且后生成的id号要比之前生成的大。

      四、stream的命令

      1、xadd 往stream末尾添加消息

      1、命令格式

      xadd key [nomkstream] [maxlen|minid [=|~] threshold [limit count]] *|id field value [field value ...]

      2、举例

      xadd 命令 返回的是数据的id, xx-yy (xx指的是毫秒数,yy指的是在这个毫秒内的第几条消息)

      1、向流中增加一条数据,

      127.0.0.1:6379> xadd stream-key * username zhangsan # 向stream-key这个流中增加一个 username 是zhangsan的数据 *表示自动生成id
      "1635999858912-0" # 返回的是id
      127.0.0.1:6379> keys *
      1) "stream-key" # 可以看到stream自动创建了
      127.0.0.1:6379>

      2、向流中增加数据,不自动创建流

      127.0.0.1:6379> xadd not-exists-stream nomkstream * username lisi # 因为指定了nomkstream参数,而not-exists-stream之前不存在,所以加入失败
      (nil)
      127.0.0.1:6379> keys *
      (empty array)
      127.0.0.1:6379>

      3、手动指定id的值

      127.0.0.1:6379> xadd stream-key 1-1 username lisi # 此处id的值是自己传递的1-1,而不是使用*自动生成
      "1-1" # 返回的是id的值
      127.0.0.1:6379>

      4、设置一个固定大小的stream1、精确指定stream的大小

      指定指定stream的大小比模糊指定stream的大小会稍微多少消耗一些性能。

      2、模糊指定stream的大小

      127.0.0.1:6379> xadd stream-key maxlen ~ 1 * first first
      "1636001034141-0"
      127.0.0.1:6379> xadd stream-key maxlen ~ 1 * second second
      "1636001044506-0"
      127.0.0.1:6379> xadd stream-key maxlen ~ 1 * third third
      "1636001057846-0"
      127.0.0.1:6379> xinfo stream stream-key
       1) "length"
       2) (integer) 3
       3) "radix-tree-keys"
       4) (integer) 1
       5) "radix-tree-nodes"
       6) (integer) 2
       7) "last-generated-id"
       8) "1636001057846-0"
       9) "groups"
      10) (integer) 0
      11) "first-entry"
      12) 1) "1636001034141-0"
          2) 1) "first"
             2) "first"
      13) "last-entry"
      14) 1) "1636001057846-0"
          2) 1) "third"
             2) "third"
      127.0.0.1:6379>

      ~ 模糊指定流的大小,可以看到指定的是1,实际上已经到了3.

      2、xrange查看stream中的消息

      1、命令格式

      xrange key start end [count count]

      2、准备数据

      127.0.0.1:6379> multi
      ok
      127.0.0.1:6379(tx)> xadd stream-key * username zhangsan
      queued
      127.0.0.1:6379(tx)> xadd stream-key * username lisi
      queued
      127.0.0.1:6379(tx)> exec
      1) "1636003481706-0"
      2) "1636003481706-1"
      127.0.0.1:6379> xadd stream-key * username wangwu
      "1636003499055-0"
      127.0.0.1:6379>

      使用redis的事务操作,获取到同一毫秒产生的多条数据,时间戳一样,序列号不一样

      3、举例

      1、获取所有的数据(-+的使用)

      127.0.0.1:6379> xrange stream-key - +
      1) 1) "1636003481706-0"
         2) 1) "username"
            2) "zhangsan"
      2) 1) "1636003481706-1"
         2) 1) "username"
            2) "lisi"
      3) 1) "1636003499055-0"
         2) 1) "username"
            2) "wangwu"
      127.0.0.1:6379>

      -: 表示最小id的值

      +:表示最大id的值

      2、获取指定id范围内的数据,闭区间

      127.0.0.1:6379> xrange stream-key 1636003481706-1 1636003499055-0
      1) 1) "1636003481706-1"
         2) 1) "username"
            2) "lisi"
      2) 1) "1636003499055-0"
         2) 1) "username"
            2) "wangwu"
      127.0.0.1:6379>

      3、获取指定id范围内的数据,开区间

      127.0.0.1:6379> xrange stream-key (1636003481706-0 (1636003499055-0
      1) 1) "1636003481706-1"
         2) 1) "username"
            2) "lisi"
      127.0.0.1:6379>

      (:表示开区间

      4、获取某个毫秒后所有的数据

      127.0.0.1:6379> xrange stream-key 1636003481706 +
      1) 1) "1636003481706-0"
         2) 1) "username"
            2) "zhangsan"
      2) 1) "1636003481706-1"
         2) 1) "username"
            2) "lisi"
      3) 1) "1636003499055-0"
         2) 1) "username"
            2) "wangwu"
      127.0.0.1:6379>

      直接写毫秒不写后面的序列号即可。

      5、获取单条数据

      127.0.0.1:6379> xrange stream-key 1636003499055-0 1636003499055-0
      1) 1) "1636003499055-0"
         2) 1) "username"
            2) "wangwu"
      127.0.0.1:6379>

      startend的值写的一样即可获取单挑数据。

      6、获取固定条数的数据

      127.0.0.1:6379> xrange stream-key - + count 1
      1) 1) "1636003481706-0"
         2) 1) "username"
            2) "zhangsan"
      127.0.0.1:6379>

      使用 count进行限制

      3、xrevrange反向查看stream中的消息

      xrevrange key end start [count count]

      使用方式和xrange类似,略。

      4、xdel删除消息

      1、命令格式

      xdel key id [id ...]

      2、准备数据

      127.0.0.1:6379> xadd stream-key * username zhangsan
      "1636004176924-0"
      127.0.0.1:6379> xadd stream-key * username lisi
      "1636004183638-0"
      127.0.0.1:6379> xadd stream-key * username wangwu
      "1636004189211-0"
      127.0.0.1:6379>

      3、举例

      需求:往stream中加入3条消息,然后删除第2条消息

      127.0.0.1:6379> xdel stream-key 1636004183638-0
      (integer) 1 # 返回的是删除记录的数量
      127.0.0.1:6379> xrang stream -key - +
      127.0.0.1:6379> xrange stream-key - +
      1) 1) "1636004176924-0"
         2) 1) "username"
            2) "zhangsan"
      2) 1) "1636004189211-0"
         2) 1) "username"
            2) "wangwu"
      127.0.0.1:6379>

      注意:

      需要注意的是,我们从stream中删除一个消息,这个消息并不是被真正的删除了,而是被标记为删除,这个时候这个消息还是占据着内容空间的。如果所有stream中所有的消息都被标记删除,这个时候才会回收内存空间。但是这个stream并不会被删除。

      5、xlen查看stream中元素的长度

      1、命令格式

      xlen key

      2、举例

      查看stream中元素的长度

      127.0.0.1:6379> xadd stream-key * username zhangsan
      "1636004690578-0"
      127.0.0.1:6379> xlen stream-key
      (integer) 1
      127.0.0.1:6379> xlen not-exists-stream-key
      (integer) 0
      127.0.0.1:6379>

      注意:

      如果xlen后方的key不存在则返回0,否则返回元素的个数。

      6、xtrim对stream中的元素进行修剪

      1、命令格式

      xtrim key maxlen|minid [=|~] threshold [limit count]

      2、准备数据

      127.0.0.1:6379>  xadd stream-key * username zhangsan
      "1636009745401-0"
      127.0.0.1:6379> multi
      ok
      127.0.0.1:6379(tx)> xadd stream-key * username lisi
      queued
      127.0.0.1:6379(tx)> xadd stream-key * username wangwu
      queued
      127.0.0.1:6379(tx)> exec
      1) "1636009763955-0"
      2) "1636009763955-1"
      127.0.0.1:6379> xadd stream-key * username zhaoliu
      "1636009769625-0"
      127.0.0.1:6379>

      3、举例

      1、maxlen精确限制

      127.0.0.1:6379> xtrim stream-key maxlen 2 # 保留最后的2个消息
      (integer) 2
      127.0.0.1:6379> xrange stream-key - + # 可以看到之前加入的2个消息被删除了
      1) 1) "1636009763955-1"
         2) 1) "username"
            2) "wangwu"
      2) 1) "1636009769625-0"
         2) 1) "username"
            2) "zhaoliu"
      127.0.0.1:6379>

      上方的意思是,保留stream-key这个stream中最后的2个消息。

      2、minid模糊限制

      minid 是删除比这个id小的数据,本地测试的时候没有测试出来,略。

      7、xread独立消费消息

      xread只是读取消息,读取完之后并不会删除消息。 使用xread读取消息,是完全独立与消费者组的,多个客户端可以同时读取消息。

      1、命令格式

      xread [count count] [block milliseconds] streams key [key ...] id [id ...]

      2、准备数据

      127.0.0.1:6379> xadd stream-key * username zhangsan"1636011801365-0"127.0.0.1:6379> xadd stream-key * username lisi"1636011806261-0"127.0.0.1:6379> xadd stream-key * username wangwu"1636011810905-0"127.0.0.1:6379>

      3、举例

      1、获取用户名是wangwu的数据

      127.0.0.1:6379> xread streams stream-key 1636011806261-0 # 此处写的是lisi的id,即读取到的数据需要是 > 1636011806261-0
      1) 1) "stream-key"
         2) 1) 1) "1636011810905-0"
               2) 1) "username"
                  2) "wangwu"

      2、获取2条数据

      127.0.0.1:6379> xread count 2 streams stream-key 0-0
      1) 1) "stream-key"
         2) 1) 1) "1636011801365-0"
               2) 1) "username"
                  2) "zhangsan"
            2) 1) "1636011806261-0"
               2) 1) "username"
                  2) "lisi"
      127.0.0.1:6379>

      count限制单次读取最后的消息,因为当前读取可能没有这么多。

      3、非阻塞读取stream对尾的数据

      即读取队列尾的下一个消息,在非阻塞模式下始终是nil

      127.0.0.1:6379> xread streams stream-key $
      (nil)

      4、阻塞读取stream对尾的数据

      注意:

      • $表示读取队列最新进来的一个消息,不是stream的最后一个消息。是xread block执行后,再次使用xadd添加消息后,xread block才会返回。
      • block 0表示永久阻塞,当消息到来时,才接触阻塞。block 1000表示阻塞1000ms,如果1000ms还没有消息到来,则返回nil
      • xread进行顺序消费 当使用xread进行顺序消息时,需要记住返回的消息id,同时下次调用xread时,需要将上次返回的消息id传递进去。
      • xread读取消息,完全无视消费组,此时stream就可以理解为一个普通的list。

      8、消费者组相关操作

      1、消费者组命令

      2、准备数据

      1、创建stream的名称是 stream-key

      2、创建2个消息,aa和bb

      127.0.0.1:6379> xadd stream-key * aa aa
      "1636362619125-0"
      127.0.0.1:6379> xadd stream-key * bb bb
      "1636362623191-0"

      3、创建消费者组

      1、创建一个从头开始消费的消费者组

      xgroup create stream-key(stream 名) g1(消费者组名) 0-0(表示从头开始消费)

      2、创建一个从stream最新的一个消息消费的消费者组

      xgroup create stream-key g2 $

      $表示从最后一个元素消费,不包括stream中的最后一个元素,即消费最新的消息。

      4、创建一个从某个消息之后消费的消费者组

      xgroup create stream-key g3 1636362619125-0  #1636362619125-0 这个是上方aa消息的id的值

      1636362619125-0某个消息的具体的id,这个g3消费者组中的消息都是大于>这个id的消息。

      3、从消费者中读取消息

      127.0.0.1:6379> xreadgroup group g1(消费组名) c1(消费者名,自动创建) count 3(读取3条) streams stream-key(stream 名) >(从该消费者组中还未分配给另外的消费者的消息开始读取)
      1) 1) "stream-key"
         2) 1) 1) "1636362619125-0"
               2) 1) "aa"
                  2) "aa"
            2) 1) "1636362623191-0"
               2) 1) "bb"
                  2) "bb"
      127.0.0.1:6379> xreadgroup group g2 c1 count 3 streams stream-key >
      (nil) # 返回 nil 是因为 g2消费组是从最新的一条信息开始读取(创建消费者组时使用了$),需要在另外的窗口执行`xadd`命令,才可以再次读取到消息
      127.0.0.1:6379> xreadgroup group g3 c1 count 3 streams stream-key >  #只读取到一条消息是因为,在创建消费者组时,指定了aa消息的id,bb消息的id大于aa,所以读取出来了。
      1) 1) "stream-key"
         2) 1) 1) "1636362623191-0"
               2) 1) "bb"
                  2) "bb"
      127.0.0.1:6379>

      4、读取消费者的pending消息

      127.0.0.1:6379> xgroup create stream-key g4 0-0
      ok
      127.0.0.1:6379> xinfo consumers stream-key g1
      1) 1) "name"
         2) "c1"
         3) "pending"
         4) (integer) 2
         5) "idle"
         6) (integer) 88792
      127.0.0.1:6379> xinfo consumers stream-key g4
      (empty array)
      127.0.0.1:6379> xreadgroup group g1 c1 count 1 streams stream-key 1636362619125-0
      1) 1) "stream-key"
         2) 1) 1) "1636362623191-0"
               2) 1) "bb"
                  2) "bb"
      127.0.0.1:6379> xreadgroup group g4 c1 count 1 block 0 streams stream-key 1636362619125-0
      1) 1) "stream-key"
         2) (empty array)
      127.0.0.1:6379>

      5、转移消费者的消息

      127.0.0.1:6379> xpending stream-key g1 - + 10 c1
      1) 1) "1636362619125-0"
         2) "c1"
         3) (integer) 2686183
         4) (integer) 1
      2) 1) "1636362623191-0"
         2) "c1"
         3) (integer) 102274
         4) (integer) 7
      127.0.0.1:6379> xpending stream-key g1 - + 10 c2
      (empty array)
      127.0.0.1:6379> xclaim stream-key g1 c2 102274 1636362623191-0
      1) 1) "1636362623191-0"
         2) 1) "bb"
            2) "bb"
      127.0.0.1:6379> xpending stream-key g1 - + 10 c2
      1) 1) "1636362623191-0"
         2) "c2"
         3) (integer) 17616
         4) (integer) 8
      127.0.0.1:6379>

      也可以通过xautoclaim来实现。

      6、一些监控命令

      1、查看消费组中消费者的pending消息

      127.0.0.1:6379> xpending stream-key g1 - + 10 c2
      1) 1) "1636362623191-0"
         2) "c2"
         3) (integer) 1247680
         4) (integer) 8
      127.0.0.1:6379>

      2、查看消费组中的消费者信息

      127.0.0.1:6379> xinfo consumers stream-key g1
      1) 1) "name"
         2) "c1"
         3) "pending"
         4) (integer) 1
         5) "idle"
         6) (integer) 1474864
      2) 1) "name"
         2) "c2"
         3) "pending"
         4) (integer) 1
         5) "idle"
         6) (integer) 1290069
      127.0.0.1:6379>

      3、查看消费组信息

      127.0.0.1:6379> xinfo groups stream-key
      1) 1) "name"
         2) "g1"
         3) "consumers"
         4) (integer) 2
         5) "pending"
         6) (integer) 2
         7) "last-delivered-id"
         8) "1636362623191-0"
      2) 1) "name"
         2) "g2"
         3) "consumers"
      ......

      4、查看stream信息

      127.0.0.1:6379> xinfo stream stream-key
       1) "length"
       2) (integer) 2
       3) "radix-tree-keys"
       4) (integer) 1
       5) "radix-tree-nodes"
       6) (integer) 2
       7) "last-generated-id"
       8) "1636362623191-0"
       9) "groups"
      10) (integer) 4
      11) "first-entry"
      12) 1) "1636362619125-0"
          2) 1) "aa"
             2) "aa"
      13) "last-entry"
      14) 1) "1636362623191-0"
          2) 1) "bb"
             2) "bb"
      127.0.0.1:6379>

      五、参考文档

      1、https://redis.io/topics/streams-intro

      2、https://www.runoob.com/redis/redis-stream.html

      到此这篇关于redis stream类型的使用详解的文章就介绍到这了,更多相关redis stream类型内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!

      (0)
      上一篇 2022年3月21日
      下一篇 2022年3月21日

      相关推荐