Spring boot使用influxDB总结

项目中需要存放大量设备日志, 且需要对其进行简单的数据分析, 信息提取工作.
结合众多考量因素, 项目决定使用时序数据库中的领头羊InfluxDB.

引入依赖

项目中使用influxdb-java, 在pom文件中添加如下依赖 (github地址:https://github.com/influxdata/influxdb-java):

    <dependency>
        <groupId>org.influxdb</groupId>
        <artifactId>influxdb-java</artifactId>
        <version>2.15</version>
    </dependency>

application.yaml文件配置如下所示 (请按照实际情况填写):

spring:
  influx:
    url: *
    password: admin
    user: 123
    database: log_management

配置

(1) 创建配置类

@Configuration
public class InfluxDbConfig {
<span class="hljs-meta">@Value("${spring.influx.url:''}")</span>
<span class="hljs-keyword">private</span> String influxDBUrl;

<span class="hljs-meta">@Value("${spring.influx.user:''}")</span>
<span class="hljs-keyword">private</span> String userName;

<span class="hljs-meta">@Value("${spring.influx.password:''}")</span>
<span class="hljs-keyword">private</span> String password;

<span class="hljs-meta">@Value("${spring.influx.database:''}")</span>
<span class="hljs-keyword">private</span> String database;

<span class="hljs-meta">@Bean</span>
<span class="hljs-keyword">public</span> InfluxDbUtils <span class="hljs-title function_">influxDbUtils</span><span class="hljs-params">()</span> {
    <span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">InfluxDbUtils</span>(userName, password, influxDBUrl, database, <span class="hljs-string">""</span>);
}

}

@Data
 public class InfluxDbUtils {
    private String userName;
    private String password;
    private String url;
    public String database;
    private String retentionPolicy;
    // InfluxDB 实例
    private InfluxDB influxDB;

    // 数据保存策略
    public static String policyNamePix = "logRetentionPolicy_";

    public InfluxDbUtils(String userName, String password, String url, String database,
                         String retentionPolicy) {
        this.userName = userName;
        this.password = password;
        this.url = url;
        this.database = database;
        this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy;
        this.influxDB = influxDbBuild();
    }

    /**
     * 连接数据库 ,若不存在则创建
     *
     * @return influxDb 实例
     */
    private InfluxDB influxDbBuild() {
        if (influxDB == null) {
            influxDB = InfluxDBFactory.connect(url, userName, password);
        }
        try {
            createDB(database);
            influxDB.setDatabase(database);
        } catch (Exception e) {
            log.error("create influx db failed, error: {}", e.getMessage());
        } finally {
            influxDB.setRetentionPolicy(retentionPolicy);
        }
        influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
        return influxDB;
    }
 }

构建实体类

InfluxDB中,measurement对应于传统关系型数据库中的table(database为配置文件中的log_management).
InfluxDB里存储的数据称为时间序列数据, 时序数据有零个多个数据点.
数据点包括time(一个时间戳),measurement( 例如logInfo),零个或多个tag,其对应于level,module,device_id), 至少一个field( 即日志内容,msg=something error).
InfluxDB会根据tag数值建立时间序列( 因此tag数值不能选取诸如UUID作为特征值, 易导致时间序列过多, 导致InfluxDB崩溃 ), 并建立相应索引, 以便优化诸如查询速度.

@Builder
@Data
@Measurement(name = "logInfo")
public class LogInfo {
<span class="hljs-comment">// Column中的name为measurement中的列名</span>
<span class="hljs-comment">// 此外,需要注意InfluxDB中时间戳均是以UTC时保存,在保存以及提取过程中需要注意时区转换</span>
<span class="hljs-meta">@Column(name = "time")</span>
<span class="hljs-keyword">private</span> String time;
<span class="hljs-comment">// 注解中添加tag = true,表示当前字段内容为tag内容</span>
<span class="hljs-meta">@Column(name = "module", tag = true)</span>
<span class="hljs-keyword">private</span> String <span class="hljs-keyword">module</span>;
<span class="hljs-meta">@Column(name = "level", tag = true)</span>
<span class="hljs-keyword">private</span> String level;
<span class="hljs-meta">@Column(name = "device_id", tag = true)</span>
<span class="hljs-keyword">private</span> String deviceId;
<span class="hljs-meta">@Column(name = "msg")</span>
<span class="hljs-keyword">private</span> String msg;

}

保存数据

以下代码为单条日志保存,influxdb-java亦支持批量保存( 因为与InfluxDB通讯均是通过http, 因此建议批量保存以减少性能损耗 ).

    LogInfo logInfo = LogInfo.builder()
        .level(jsonObject.getString("level"))
        .module(module)
        .deviceId(deviceId)
        .msg(jsonObject.getString("msg"))
        .build();
    Point point = Point.measurementByPOJO(logInfo.getClass())
        .addFieldsFromPOJO(logInfo)
        .time(jsonObject.getLong("time"), TimeUnit.MILLISECONDS)
        .build();
    // 出于业务考量, 设备可以设置不同的保存策略 (策略名为固定前缀 + 设备 ID)
    influxDB.write(influxDBUtils.database, InfluxDbUtils.policyNamePix + deviceId, point);

查询数据

因为代码与业务耦合比较厉害, 因此此处仅截选做概要示范.

    // InfluxDB 支持分页查询, 因此可以设置分页查询条件
    String pageQuery = "LIMIT" + request.getPageSize() + "OFFSET" + ((request.getPageNum() - 1) * request.getPageSize());
    // 此处查询所有内容, 如果
    String queryCmd = "SELECT * FROM"
        // 查询指定设备下的日志信息
        // 要指定从 RetentionPolicyName(保存策略前缀 + 设备 ID).measurement(logInfo) 中查询指定数据 )
        + InfluxDbUtils.policyNamePix + request.getDeviceId() + "." + "logInfo"
        // 添加查询条件 (注意查询条件选择 tag 值, 选择 field 数值会严重拖慢查询速度)
        + queryCondition
        // 查询结果需要按照时间排序
        + "ORDER BY time DESC"
        // 添加分页查询条件
        + pageQuery;

选择时序数据库, 不建议使用删除以及更新操作, 因此不做介绍.

可以通过创建或者RetentionPolicy, 来添加或者更新数据的删除时间.

PS:
如果您觉得我的文章对您有帮助,请关注我的微信公众号,谢谢!
程序员打怪之路