Redis Pipeline(管道)java RedisTemplate hutool RedisDS
Redis Pipeline 简介
Redis 是一种基于客户端 - 服务端模型以及请求 / 响应的 TCP 服务。一次 Redis 客户端发起的请求,经过服务端的响应后,大致会经历如下的步骤:
- 客户端发起一个(查询 / 插入)请求,并监听 socket 返回,通常情况都是阻塞模式等待 Redis 服务器的响应
- 服务端处理命令,并且返回处理结果给客户端
- 客户端接收到服务的返回结果,程序从阻塞代码处返回
Redis 客户端和服务端之间通过网络连接进行数据传输,这个连接可以很快(loopback 接口)或很慢(建立了一个多次跳转的网络连接)。无论网络延如何延时,数据包总是能从客户端到达服务器,并从服务器返回数据回复客户端,这个时间被称之为 RTT(Round Trip Time - 往返时间)。我们可以很容易就意识到,Redis 在连续请求服务端时,即使 Redis 每秒能处理 100k 请求,但也会因为网络传输花费大量时间,导致整体性能的下降。
因此如果遇到大量的批处理,我们可以考虑使用 Redis 的 pipeline(管道)。值得注意的是,管道技术并不是 Redis 特有的技术,管道技术往往需要客户端 - 服务器的共同配合,大部分工作任务其实是在客户端完成,很显然 Redis 支持管道技术,按照官网的意思,Redis 的最低版本就考虑了管道技术的支持性设计。
如下图,多个连续的 incr 指令,使用 pipeline(管道)后,多个连续的 incr 指令只会花费一次网络来回开销,这个开销会随着 n 数值的增大,大幅减少网络 io 开销,从而提升整体服务的性能。
Redis Pipeline 深究
在上述简介中,提到了管道技术优化的是网络传输的耗时时间,这里通过 Redis 客户端 - 服务端的一次完整的网络请求来回,深入探索 pipeline 的本质。
- 客户端调用 write 将数据写入操作系统内核 (kernel) 为 socket 连接分配的发送缓冲区(send buffer)
- 客户端操作系统内核将发送缓冲区 (send buffer) 的数据发送到网卡(NIC)
- 网卡 (NIC) 将数据通过路由 (route) 将数据送到 Redis 服务器机器网卡(NIC)
- 服务器操作系统内核 (kernel) 将网卡 (NIC) 接收的数据,写入内核为 socket 分配的接收缓冲区(recv buffer)
- 服务器进程从接收缓冲区调用 read 读取数据,并进行数据逻辑处理
- 数据处理完成之后,服务器进程调用 write 将响应数据写入操作系统内核为 socket 分配的发送缓冲区
- 操作系统内核将发送缓冲区的数据发送到服务器网卡
- 服务器网卡将响应数据通过路由发送到客户端网卡
- 客户端网卡接收响应数据
- 客户端操作系统内核读取网卡接收到的服务器响应数据,并写入操作系统为 socket 连接分配的介绍缓冲区
- 客户端进程调用 read 从接收缓冲区中读取服务器响应数据
- 一次完整网络请求来回过程结束
对于 pipeline 技术而言,就是将 n * 12 个步骤,合并成 1 * 12,这样服务请求响应的总体时间将会大大的减少。
有个值得注意的点:
在上述网络请求来回中,可能出现我们经常说到的 io 阻塞:
- 当 write 操作发生,并且发送缓冲区(send buffer)满时,就会导致 write 操作阻塞
- 当 read 操作发生,并且接收缓冲区(recv buffer)满时,就会导致 read 操作阻塞
上述的这两个阻塞如果出现,将会导致整个请求时间变长,因此我们操作大批量指令的时候,比如 10k 个指令,我们可以合理的对指令分多次批量发送,这样可以减少出现阻塞的情况,也可以避免服务器响应一个过大的答复包,导致客户端内存负载过重。
Redis Pipeline benchmark 压测 pipeline
使用 Redis 提供的 benchmark 对 Redis 进行性能测试,
如过你是 Windows 下的 Redis,在安装目录下有个 redis-benchmark.exe,进入 cmd 命令模式测试即可
如果你是在 Linux 下的 redis,在安装目录的 src 目录下有个 redis-benchmark
redis-benchmark 的全部指令参数如下所示,我们这里测试 pipeline,需要使用 -P
指令名称 | 描述 | 默认值 |
---|---|---|
-h | 指定 Redis 服务器 hostname | 127.0.0.1 |
-p | 指定 Redis 服务器端口 | 6379 |
-s | 指定 Redis 服务器 Server | Socket |
-a | 指定 Redis 服务器密码 | |
-c | 指定客户端并发数 | 50 |
-n | 指定总请求数 | 100000 |
-dbnum | 指定 Redis 数据库 | 0 |
-k | 1=keep alive 0=reconnect | 1 |
-r | 使用随机 key,value 对相关指令进行压测 | |
-P | 使用管道(pipeline) | 1(no pipeline) |
-q | 强制退出 Redis,仅展示 query/sec | |
--csv | 使用 CSV 格式输出 | |
-l | 循环运行测试 | |
-t | 运行逗号分隔的测试列表 | |
-I | Idle 模式,仅打开 N 个 idle 连接并等待 |
通过普通方式测试 set 指令和 pipeline 方式测试 set 指令,可以看到 Redis 服务不同的 QPS:
- 普通 set 方式,Redis QPS 大概在 5.3 万左右
- 当使用 pipeline set 时,随着管道内并行请求数量的增加,Redis QPS 可以达到 100 万以上
Redis Pipeline Jedis 使用 pipeline
测试代码
package com.liziba.redis;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import java.io.IOException;
/**
-
测试 pipeline
*/
public class PipelineTest {
public static void main(String[] args) throws IOException {
Jedis client = new Jedis("127.0.0.1", 6379);
<span class="hljs-type">long</span> <span class="hljs-variable">startPipe</span> <span class="hljs-operator">=</span> System.currentTimeMillis();
<span class="hljs-type">Pipeline</span> <span class="hljs-variable">pipe</span> <span class="hljs-operator">=</span> client.pipelined();
pipe.multi();
<span class="hljs-keyword">for</span> (<span class="hljs-type">int</span> <span class="hljs-variable">i</span> <span class="hljs-operator">=</span> <span class="hljs-number">0</span>; i < <span class="hljs-number">100000</span>; i++) {
pipe.set(<span class="hljs-string">"pipe"</span> + i, i + <span class="hljs-string">""</span> );
}
pipe.exec();
pipe.close();
<span class="hljs-type">long</span> <span class="hljs-variable">endPipe</span> <span class="hljs-operator">=</span> System.currentTimeMillis();
System.out.println(<span class="hljs-string">"pipeline set cost time : "</span> + (endPipe - startPipe) + <span class="hljs-string">"ms"</span>);
<span class="hljs-keyword">for</span> (<span class="hljs-type">int</span> <span class="hljs-variable">i</span> <span class="hljs-operator">=</span> <span class="hljs-number">0</span>; i < <span class="hljs-number">100000</span>; i++) {
client.set(<span class="hljs-string">"normal"</span> + i, i + <span class="hljs-string">""</span>);
}
System.out.println(<span class="hljs-string">"normal set cost time : "</span> + (System.currentTimeMillis() - endPipe)+ <span class="hljs-string">"ms"</span>);
}
}
测试结果
java RedisTemplate 使用 pipeline
redisTemplate.executePipelined(
(RedisCallback<Object>) connection -> {
connection.openPipeline();
for (Long i = aLong; i < aLong + finalStep; i++) {
// key3 runList 待处理队列
connection.rPush(MIDUO_RUN_LIST.getBytes(StandardCharsets.UTF_8), i.toString().getBytes(StandardCharsets.UTF_8));
}
return null;
}
);
java hutool Jedis 使用 pipeline
Jedis jedis1 = RedisDS.create().getJedis();
System.out.println("jedis1 =" + jedis1);
long startPipe = System.currentTimeMillis();
Pipeline pipelined = jedis1.pipelined();
for (int i = 0; i < 1000L; i++) {
pipelined.lpop("jedis1");
}
List<Object> objects = pipelined.<span class="hljs-keyword">syncAndReturnAll();
for (Object object : objects) {
System.out.println("object =" + object);
jedis1.rpush("jedis1", (String) object);
}
pipelined.<span class="hljs-keyword">close();
jedis1.close();