Redis Pipeline(管道)java RedisTemplate hutool RedisDS

Redis Pipeline 简介

Redis 是一种基于客户端 - 服务端模型以及请求 / 响应的 TCP 服务。一次 Redis 客户端发起的请求,经过服务端的响应后,大致会经历如下的步骤:

  • 客户端发起一个(查询 / 插入)请求,并监听 socket 返回,通常情况都是阻塞模式等待 Redis 服务器的响应
  • 服务端处理命令,并且返回处理结果给客户端
  • 客户端接收到服务的返回结果,程序从阻塞代码处返回

Redis 客户端和服务端之间通过网络连接进行数据传输,这个连接可以很快(loopback 接口)或很慢(建立了一个多次跳转的网络连接)。无论网络延如何延时,数据包总是能从客户端到达服务器,并从服务器返回数据回复客户端,这个时间被称之为 RTT(Round Trip Time - 往返时间)。我们可以很容易就意识到,Redis 在连续请求服务端时,即使 Redis 每秒能处理 100k 请求,但也会因为网络传输花费大量时间,导致整体性能的下降。

因此如果遇到大量的批处理,我们可以考虑使用 Redis 的 pipeline(管道)。值得注意的是,管道技术并不是 Redis 特有的技术,管道技术往往需要客户端 - 服务器的共同配合,大部分工作任务其实是在客户端完成,很显然 Redis 支持管道技术,按照官网的意思,Redis 的最低版本就考虑了管道技术的支持性设计。

如下图,多个连续的 incr 指令,使用 pipeline(管道)后,多个连续的 incr 指令只会花费一次网络来回开销,这个开销会随着 n 数值的增大,大幅减少网络 io 开销,从而提升整体服务的性能。

Redis Pipeline 深究

在上述简介中,提到了管道技术优化的是网络传输的耗时时间,这里通过 Redis 客户端 - 服务端的一次完整的网络请求来回,深入探索 pipeline 的本质。

  • 客户端调用 write 将数据写入操作系统内核 (kernel) 为 socket 连接分配的发送缓冲区(send buffer)
  • 客户端操作系统内核将发送缓冲区 (send buffer) 的数据发送到网卡(NIC)
  • 网卡 (NIC) 将数据通过路由 (route) 将数据送到 Redis 服务器机器网卡(NIC)
  • 服务器操作系统内核 (kernel) 将网卡 (NIC) 接收的数据,写入内核为 socket 分配的接收缓冲区(recv buffer)
  • 服务器进程从接收缓冲区调用 read 读取数据,并进行数据逻辑处理
  • 数据处理完成之后,服务器进程调用 write 将响应数据写入操作系统内核为 socket 分配的发送缓冲区
  • 操作系统内核将发送缓冲区的数据发送到服务器网卡
  • 服务器网卡将响应数据通过路由发送到客户端网卡
  • 客户端网卡接收响应数据
  • 客户端操作系统内核读取网卡接收到的服务器响应数据,并写入操作系统为 socket 连接分配的介绍缓冲区
  • 客户端进程调用 read 从接收缓冲区中读取服务器响应数据
  • 一次完整网络请求来回过程结束
    对于 pipeline 技术而言,就是将 n * 12 个步骤,合并成 1 * 12,这样服务请求响应的总体时间将会大大的减少。

有个值得注意的点
在上述网络请求来回中,可能出现我们经常说到的 io 阻塞:

  • 当 write 操作发生,并且发送缓冲区(send buffer)满时,就会导致 write 操作阻塞
  • 当 read 操作发生,并且接收缓冲区(recv buffer)满时,就会导致 read 操作阻塞
    上述的这两个阻塞如果出现,将会导致整个请求时间变长,因此我们操作大批量指令的时候,比如 10k 个指令,我们可以合理的对指令分多次批量发送,这样可以减少出现阻塞的情况,也可以避免服务器响应一个过大的答复包,导致客户端内存负载过重。

Redis Pipeline benchmark 压测 pipeline

使用 Redis 提供的 benchmark 对 Redis 进行性能测试,
如过你是 Windows 下的 Redis,在安装目录下有个 redis-benchmark.exe,进入 cmd 命令模式测试即可

如果你是在 Linux 下的 redis,在安装目录的 src 目录下有个 redis-benchmark

redis-benchmark 的全部指令参数如下所示,我们这里测试 pipeline,需要使用 -P

指令名称 描述 默认值
-h 指定 Redis 服务器 hostname 127.0.0.1
-p 指定 Redis 服务器端口 6379
-s 指定 Redis 服务器 Server Socket
-a 指定 Redis 服务器密码
-c 指定客户端并发数 50
-n 指定总请求数 100000
-dbnum 指定 Redis 数据库 0
-k 1=keep alive 0=reconnect 1
-r 使用随机 key,value 对相关指令进行压测
-P 使用管道(pipeline) 1(no pipeline)
-q 强制退出 Redis,仅展示 query/sec
--csv 使用 CSV 格式输出
-l 循环运行测试
-t 运行逗号分隔的测试列表
-I Idle 模式,仅打开 N 个 idle 连接并等待

通过普通方式测试 set 指令和 pipeline 方式测试 set 指令,可以看到 Redis 服务不同的 QPS:

  • 普通 set 方式,Redis QPS 大概在 5.3 万左右
  • 当使用 pipeline set 时,随着管道内并行请求数量的增加,Redis QPS 可以达到 100 万以上

Redis Pipeline Jedis 使用 pipeline

测试代码

package com.liziba.redis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;

import java.io.IOException;

/**

  • 测试 pipeline
    */
    public class PipelineTest {

    public static void main(String[] args) throws IOException {
    Jedis client = new Jedis("127.0.0.1", 6379);

     <span class="hljs-type">long</span> <span class="hljs-variable">startPipe</span> <span class="hljs-operator">=</span> System.currentTimeMillis();
     <span class="hljs-type">Pipeline</span> <span class="hljs-variable">pipe</span> <span class="hljs-operator">=</span> client.pipelined();
     pipe.multi();
     <span class="hljs-keyword">for</span> (<span class="hljs-type">int</span> <span class="hljs-variable">i</span> <span class="hljs-operator">=</span> <span class="hljs-number">0</span>; i &lt; <span class="hljs-number">100000</span>; i++) {
         pipe.set(<span class="hljs-string">"pipe"</span> + i, i + <span class="hljs-string">""</span> );
     }
     pipe.exec();
     pipe.close();
     <span class="hljs-type">long</span> <span class="hljs-variable">endPipe</span> <span class="hljs-operator">=</span> System.currentTimeMillis();
     System.out.println(<span class="hljs-string">"pipeline set cost time : "</span> + (endPipe - startPipe) + <span class="hljs-string">"ms"</span>);
    
    
     <span class="hljs-keyword">for</span> (<span class="hljs-type">int</span> <span class="hljs-variable">i</span> <span class="hljs-operator">=</span> <span class="hljs-number">0</span>; i &lt; <span class="hljs-number">100000</span>; i++) {
         client.set(<span class="hljs-string">"normal"</span> + i, i + <span class="hljs-string">""</span>);
     }
     System.out.println(<span class="hljs-string">"normal set cost time : "</span> + (System.currentTimeMillis() - endPipe)+ <span class="hljs-string">"ms"</span>);
    

    }

}

测试结果

java RedisTemplate 使用 pipeline

redisTemplate.executePipelined(
	(RedisCallback<Object>) connection -> {
		connection.openPipeline();
		for (Long i = aLong; i < aLong + finalStep; i++) {
			// key3 runList  待处理队列
			connection.rPush(MIDUO_RUN_LIST.getBytes(StandardCharsets.UTF_8), i.toString().getBytes(StandardCharsets.UTF_8));
		}
		return null;
	}
);                      

java hutool Jedis 使用 pipeline

        Jedis jedis1 = RedisDS.create().getJedis();
        System.out.println("jedis1 =" + jedis1);
        long startPipe = System.currentTimeMillis();
        Pipeline pipelined = jedis1.pipelined();

        for (int i = 0; i < 1000L; i++) {
            pipelined.lpop("jedis1");
        }
    List&lt;Object&gt; objects = pipelined.<span class="hljs-keyword">syncAndReturnAll();

for (Object object : objects) {
System.out.println("object =" + object);
jedis1.rpush("jedis1", (String) object);

    }

    pipelined.<span class="hljs-keyword">close();

jedis1.close();