spring boot +RabbitMQ +InfluxDB+Grafara监控实践

本文需要有相关 spring boot 或 spring cloud 相关微服务框架的基础,如果您具备相关基础可以很容易的实现下述过程!!!!!!!

  希望本文的所说对需要的您有所帮助

  从这里我们开始进入闲聊阶段。

  大家都知道 spring boot 整合了很多很多的第三方框架,我们这里就简单讨论和使用 性能监控和 JVM 监控相关的东西。其他的本文不讨论虽然有些关联,所以开篇有说需要有相关 spring boot 框架基础说了这么多废话,下面真正进入主题。

  这里首先给大家看下整体的数据流程图,其中两条主线一条是接口或方法性能监控数据收集,还有一条是 spring boot 微服务 JVM 相关指标数据采集,最后都汇总到 InfluxDB 时序数据库中在用数据展示工具 Grafara 进行数据展示或报警。

  

 

 

  〇、基础服务

    基础服务比较多,其中包括 RabbitMQ,Eureka 注册中心,influxDB,Grafara(不知道这些东西 请百度或谷歌一下了解相关知识),下面简单说下各基础服务的功能:

      RabbitMQ 一款很流行的消息中间件,主要用它来收集 spring boot 应用监控性能相关信息,为什么是 RabbitMQ 而不是什么别的 kafka 等等,因为测试方便性能也够用,spring boot 整合的够完善。

      Eureka 注册中心,一般看过或用过 spring cloud 相关框架的都知道 spring cloud 注册中心主要推荐使用 Eureka!至于为什么不做过多讨论不是本文主要讨论的关注点。本文主要用来同步和获取注册到注册中心的应用的相关信息。

      InfluxDB 和 Grafara 为什么选这两个,其他方案如 ElasticSearch 、Logstash 、Kibana,ELK 的组合等!原因很显然 influxDB 是时序数据库数据的压缩比率比其他(ElasticSearch )好的很多(当然本人没有实际测试过都是看一些文档)。同时 InfluxDB 使用 SQL 非常类似 mysql 等关系型数据库入门方便,Grafara 工具可预警。等等!!!!!!!!!!!

      好了工具就简单介绍到这里,至于这些工具怎么部署搭建请搭建先自行找资料学习,还是因为不是本文重点介绍的内容,不深入讨论。如果有 docker 相关基础的童鞋可以直接下载个镜像启动起来做测试使用(本人就是使用 docker 启动的上面的基础应用(Eureka 除外))

  一、被监控的应用

    这里不多说被监控应用肯定是 spring boot 项目但是要引用一下相关包和相关注解以及修改相关配置文件

    包引用,这些包是必须引用的

    

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
         <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-sleuth-zipkin-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-hystrix</artifactId>
        </dependency>
        

      简单说下呢相关包的功能 spring-cloud-starter-netflix-eureka-client 用于注册中心使用的包,spring-cloud-starter-stream-rabbit 发送 RabbitMQ 相关包,spring-boot-starter-actuator 发布监控相关 rest 接口包,

      spring-cloud-starter-hystrix 熔断性能监控相关包。
      
      相关注解
@EnableHystrix//开启性能监控
@RefreshScope//刷新配置文件 与本章无关
@EnableAutoConfiguration
@EnableFeignClients//RPC 调用与本章无关
@RestController
@SpringBootApplication
public class ServerTestApplication {
    protected final static Logger logger = LoggerFactory.getLogger(ServerTestApplication.class);
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> main(String[] args) { SpringApplication.run(ServerTestApplication.</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">, args); }

}

    配置文件相关

hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds: 60000
hystrix.threadpool.default.coreSize: 100
spring:
  application:
    name: spring-cloud-server2-test
  rabbitmq:
    host: 10.10.12.21
    port: 5672
    username: user
    password: password

encrypt:
failOnError: false
server:
port: 8081
eureka:
instance:
appname: spring-cloud-server2-test
prefer-ip-address: true
client:
serviceUrl:
defaultZone: http://IP:PORT/eureka/# 注册中心地址
eureka-server-total-connections-per-host: 500
endpoints:
refresh:
sensitive: false
metrics:
sensitive: false
dump:
sensitive: false
auditevents:
sensitive: false
features:
sensitive: false
mappings:
sensitive: false
trace:
sensitive: false
autoconfig:
sensitive: false
loggers:
sensitive: false

View Code

    简单解释一下 endpoints 下面相关配置,主要就是 原来这些路径是需要授权访问的,通过配置让这些路径接口不再是敏感的需要授权访问的接口这应我们就可以轻松的访问注册到注册中心的每个服务的响应的接口。这里插一句接口性能需要在方法上面加上如下类似相关注解,然后才会有相关性能数据输出

  @Value("${name}")
    private String name;
@HystrixCommand(commandProperties </span>=<span style="color: rgba(0, 0, 0, 1)"> { @HystrixProperty(name </span>= "execution.isolation.thread.timeoutInMilliseconds", value = "20000") }, threadPoolProperties =<span style="color: rgba(0, 0, 0, 1)"> { @HystrixProperty(name </span>= "coreSize", value = "64") }, threadPoolKey = "test1"<span style="color: rgba(0, 0, 0, 1)">) @GetMapping(</span>"/testpro1"<span style="color: rgba(0, 0, 0, 1)">) </span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String getStringtest1(){ </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> name; }</span></pre>
  好了到这里你的应用基本上就具备相关性能输出的能力了。你可以访问

  如果是上图的接口 你的应用基本 OK,为什么是基本因为你截图没有体现性能信息发送 RabbitMQ 的相关信息。这个需要看日志,加入你失败了评论区在讨论。我们先关注主线。

  好的 spring boot 应用就先说道这里。开始下一主题


  二、性能指标数据采集

    刚才访问 http://IP:port/hystrix.stream 这个显示出来的信息就是借口或方法性能相关信息的输出,如果上面都没有问题的话数据应该发送到了 RabbitMQ 上面了我们直接去 RabbitMQ 上面接收相关数据就可以了。

    性能指标数据的采集服务主要应用以下包

     <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.github.miwurster/spring-data-influxdb -->
        <dependency>
            <groupId>org.influxdb</groupId>
            <artifactId>influxdb-java</artifactId>
            <version>2.8</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>

    直接贴代码

package application;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
*

  • @author zyg

*/
@SpringBootApplication
public class RabbitMQApplication {

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> main(String[] args) { SpringApplication.run(RabbitMQApplication.</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">, args); }

}

View Code
package application;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
*

  • @author zyg

*/
@Configuration
public class RabbitMQConfig {
public final static String QUEUE_NAME = "spring-boot-queue";
public final static String EXCHANGE_NAME = "springCloudHystrixStream";
public final static String ROUTING_KEY = "#";

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 创建队列</span>

@Bean
public Queue queue() {
return new Queue(QUEUE_NAME);
}

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 创建一个 topic 类型的交换器</span>

@Bean
public TopicExchange exchange() {
return new TopicExchange(EXCHANGE_NAME);
}

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 使用路由键(routingKey)把队列(Queue)绑定到交换器(Exchange)</span>

@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}

@Bean </span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> ConnectionFactory connectionFactory() { </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">rabbitmq IP 端口号</span> CachingConnectionFactory connectionFactory = <span style="color: rgba(0, 0, 255, 1)">new</span> CachingConnectionFactory("IP", 5672<span style="color: rgba(0, 0, 0, 1)">); connectionFactory.setUsername(</span>"user"<span style="color: rgba(0, 0, 0, 1)">); connectionFactory.setPassword(</span>"password"<span style="color: rgba(0, 0, 0, 1)">); </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> connectionFactory; } @Bean </span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> RabbitTemplate(connectionFactory); }

}

View Code
package application;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
import org.influxdb.dto.Point.Builder;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;

/**
*

  • @author zyg

*/
public class InfluxDBConnect {
private String username;// 用户名
private String password;// 密码
private String openurl;// 连接地址
private String database;// 数据库

<span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> InfluxDB influxDB; </span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> InfluxDBConnect(String username, String password, String openurl, String database) { </span><span style="color: rgba(0, 0, 255, 1)">this</span>.username =<span style="color: rgba(0, 0, 0, 1)"> username; </span><span style="color: rgba(0, 0, 255, 1)">this</span>.password =<span style="color: rgba(0, 0, 0, 1)"> password; </span><span style="color: rgba(0, 0, 255, 1)">this</span>.openurl =<span style="color: rgba(0, 0, 0, 1)"> openurl; </span><span style="color: rgba(0, 0, 255, 1)">this</span>.database =<span style="color: rgba(0, 0, 0, 1)"> database; } </span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)"> 连接时序数据库;获得InfluxDB *</span><span style="color: rgba(0, 128, 0, 1)">*/</span> <span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> InfluxDB influxDbBuild() { </span><span style="color: rgba(0, 0, 255, 1)">if</span> (influxDB == <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) { influxDB </span>=<span style="color: rgba(0, 0, 0, 1)"> InfluxDBFactory.connect(openurl, username, password); influxDB.createDatabase(database); } </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> influxDB; } </span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)"> * 设置数据保存策略 defalut 策略名 /database 数据库名/ 30d 数据保存时限30天/ 1 副本个数为1/ 结尾DEFAULT * 表示 设为默认的策略 </span><span style="color: rgba(0, 128, 0, 1)">*/</span> <span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> createRetentionPolicy() { String command </span>= String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT"<span style="color: rgba(0, 0, 0, 1)">, </span>"defalut", database, "30d", 1<span style="color: rgba(0, 0, 0, 1)">); </span><span style="color: rgba(0, 0, 255, 1)">this</span><span style="color: rgba(0, 0, 0, 1)">.query(command); } </span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)"> * 查询 * * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> command * 查询语句 * </span><span style="color: rgba(128, 128, 128, 1)">@return</span> <span style="color: rgba(0, 128, 0, 1)">*/</span> <span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> QueryResult query(String command) { </span><span style="color: rgba(0, 0, 255, 1)">return</span> influxDB.query(<span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Query(command, database)); } </span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)"> * 插入 * * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> measurement * 表 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> tags * 标签 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> fields * 字段 </span><span style="color: rgba(0, 128, 0, 1)">*/</span> <span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> insert(String measurement, Map&lt;String, String&gt; tags, Map&lt;String, Object&gt;<span style="color: rgba(0, 0, 0, 1)"> fields) { Builder builder </span>=<span style="color: rgba(0, 0, 0, 1)"> Point.measurement(measurement); builder.time(((</span><span style="color: rgba(0, 0, 255, 1)">long</span>)fields.get("currentTime"))*1000000<span style="color: rgba(0, 0, 0, 1)">, TimeUnit.NANOSECONDS); builder.tag(tags); builder.fields(fields); </span><span style="color: rgba(0, 128, 0, 1)">//

influxDB.write(database, "", builder.build());
}

</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)"> * 删除 * * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> command * 删除语句 * </span><span style="color: rgba(128, 128, 128, 1)">@return</span><span style="color: rgba(0, 128, 0, 1)"> 返回错误信息 </span><span style="color: rgba(0, 128, 0, 1)">*/</span> <span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String deleteMeasurementData(String command) { QueryResult result </span>= influxDB.query(<span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Query(command, database)); </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> result.getError(); } </span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)"> * 创建数据库 * * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> dbName </span><span style="color: rgba(0, 128, 0, 1)">*/</span> <span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> createDB(String dbName) { influxDB.createDatabase(dbName); } </span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)"> * 删除数据库 * * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> dbName </span><span style="color: rgba(0, 128, 0, 1)">*/</span> <span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> deleteDB(String dbName) { influxDB.deleteDatabase(dbName); } </span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String getUsername() { </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> username; } </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> setUsername(String username) { </span><span style="color: rgba(0, 0, 255, 1)">this</span>.username =<span style="color: rgba(0, 0, 0, 1)"> username; } </span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String getPassword() { </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> password; } </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> setPassword(String password) { </span><span style="color: rgba(0, 0, 255, 1)">this</span>.password =<span style="color: rgba(0, 0, 0, 1)"> password; } </span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String getOpenurl() { </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> openurl; } </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> setOpenurl(String openurl) { </span><span style="color: rgba(0, 0, 255, 1)">this</span>.openurl =<span style="color: rgba(0, 0, 0, 1)"> openurl; } </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> setDatabase(String database) { </span><span style="color: rgba(0, 0, 255, 1)">this</span>.database =<span style="color: rgba(0, 0, 0, 1)"> database; }

}

View Code
package application;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
*

  • @author zyg

*/
@Configuration
public class InfluxDBConfiguration {

</span><span style="color: rgba(0, 0, 255, 1)">private</span> String username = "admin";<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">用户名</span> <span style="color: rgba(0, 0, 255, 1)">private</span> String password = "admin";<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">密码</span> <span style="color: rgba(0, 0, 255, 1)">private</span> String openurl = "http://IP:8086";<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">InfluxDB连接地址</span> <span style="color: rgba(0, 0, 255, 1)">private</span> String database = "test_db";<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">数据库</span>
@Bean public InfluxDBConnect getInfluxDBConnect(){ InfluxDBConnect influxDB = new InfluxDBConnect(username, password, openurl, database);
influxDB.influxDbBuild(); influxDB.createRetentionPolicy(); </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> influxDB; }

}

View Code
package application;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
*

  • @author zyg

*/
@Component
public class Consumer {
protected final static Logger logger = LoggerFactory.getLogger(Consumer.class);

</span><span style="color: rgba(0, 0, 255, 1)">private</span> ObjectMapper objectMapper = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ObjectMapper(); @Autowired </span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> InfluxDBConnect influxDB; @RabbitListener(queues </span>=<span style="color: rgba(0, 0, 0, 1)"> RabbitMQConfig.QUEUE_NAME) </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> sendToSubject(org.springframework.amqp.core.Message message) { String payload </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> String(message.getBody()); logger.info(payload); </span><span style="color: rgba(0, 0, 255, 1)">if</span> (payload.startsWith("\""<span style="color: rgba(0, 0, 0, 1)">)) { </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Legacy payload from an Angel client</span> payload = payload.substring(1, payload.length() - 1<span style="color: rgba(0, 0, 0, 1)">); payload </span>= payload.replace("\\\"", "\""<span style="color: rgba(0, 0, 0, 1)">); } </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> { </span><span style="color: rgba(0, 0, 255, 1)">if</span> (payload.startsWith("["<span style="color: rgba(0, 0, 0, 1)">)) { @SuppressWarnings(</span>"unchecked"<span style="color: rgba(0, 0, 0, 1)">) List</span>&lt;Map&lt;String, Object&gt;&gt; list = <span style="color: rgba(0, 0, 255, 1)">this</span>.objectMapper.readValue(payload, List.<span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">); </span><span style="color: rgba(0, 0, 255, 1)">for</span> (Map&lt;String, Object&gt;<span style="color: rgba(0, 0, 0, 1)"> map : list) { sendMap(map); } } </span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)"> { @SuppressWarnings(</span>"unchecked"<span style="color: rgba(0, 0, 0, 1)">) Map</span>&lt;String, Object&gt; map = <span style="color: rgba(0, 0, 255, 1)">this</span>.objectMapper.readValue(payload, Map.<span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">); sendMap(map); } } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (IOException ex) { logger.error(</span>"Error receiving hystrix stream payload: " +<span style="color: rgba(0, 0, 0, 1)"> payload, ex); } } </span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">void</span> sendMap(Map&lt;String, Object&gt;<span style="color: rgba(0, 0, 0, 1)"> map) { Map</span>&lt;String, Object&gt; data =<span style="color: rgba(0, 0, 0, 1)"> getPayloadData(map); data.remove(</span>"latencyExecute"<span style="color: rgba(0, 0, 0, 1)">); data.remove(</span>"latencyTotal"<span style="color: rgba(0, 0, 0, 1)">); Map</span>&lt;String, String&gt; tags = <span style="color: rgba(0, 0, 255, 1)">new</span> HashMap&lt;String, String&gt;<span style="color: rgba(0, 0, 0, 1)">(); tags.put(</span>"type", data.get("type"<span style="color: rgba(0, 0, 0, 1)">).toString()); tags.put(</span>"name", data.get("name"<span style="color: rgba(0, 0, 0, 1)">).toString()); tags.put(</span>"instanceId", data.get("instanceId"<span style="color: rgba(0, 0, 0, 1)">).toString()); </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">tags.put("group", data.get("group").toString());</span>
influxDB.insert(</span>"testaaa"<span style="color: rgba(0, 0, 0, 1)">, tags, data); </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> for (String key : data.keySet()) { </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> logger.info("{}:{}",key,data.get(key)); </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> }</span>
} </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> Map&lt;String, Object&gt; getPayloadData(Map&lt;String, Object&gt;<span style="color: rgba(0, 0, 0, 1)"> jsonMap) { @SuppressWarnings(</span>"unchecked"<span style="color: rgba(0, 0, 0, 1)">) Map</span>&lt;String, Object&gt; origin = (Map&lt;String, Object&gt;) jsonMap.get("origin"<span style="color: rgba(0, 0, 0, 1)">); String instanceId </span>= <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">; </span><span style="color: rgba(0, 0, 255, 1)">if</span> (origin.containsKey("id"<span style="color: rgba(0, 0, 0, 1)">)) { instanceId </span>= origin.get("host") + ":" + origin.get("id"<span style="color: rgba(0, 0, 0, 1)">).toString(); } </span><span style="color: rgba(0, 0, 255, 1)">if</span> (!<span style="color: rgba(0, 0, 0, 1)">StringUtils.hasText(instanceId)) { </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> TODO: instanceid template</span> instanceId = origin.get("serviceId") + ":" + origin.get("host") + ":" + origin.get("port"<span style="color: rgba(0, 0, 0, 1)">); } @SuppressWarnings(</span>"unchecked"<span style="color: rgba(0, 0, 0, 1)">) Map</span>&lt;String, Object&gt; data = (Map&lt;String, Object&gt;) jsonMap.get("data"<span style="color: rgba(0, 0, 0, 1)">); data.put(</span>"instanceId"<span style="color: rgba(0, 0, 0, 1)">, instanceId); </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> data; }

}

View Code

    这里不多说,就是接收 RabbitMQ 信息然后保存到 InfluxDB 数据库中。

  三、JVM 相关数据采集

    JVM 相关数据采集非常简单主要思想就是定时轮训被监控服务的接口地址然后把返回信息插入到 InfluxDB 中

    服务引用的包不多说这个服务是需要注册到注册中心 Eureka 中的因为需要获取所有服务的监控信息。

    插入 InfluxDB 代码和上面基本类似只不过多了一个批量插入方法

    

package com.zjs.collection;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

/**
*

  • @author zyg

*/
@EnableEurekaClient
@SpringBootApplication
public class ApplictionCollection
{
public static void main(String[] args) {
SpringApplication.run(ApplictionCollection.
class, args);
}
}

View Code
/**
     * 批量插入
     * 
     * @param measurement
     *            表
     * @param tags
     *            标签
     * @param fields
     *            字段
     */
    public void batchinsert(String measurement, Map<String, String> tags, List<Map<String, Object>> fieldslist) {
        org.influxdb.dto.BatchPoints.Builder batchbuilder=BatchPoints.database(database);
</span><span style="color: rgba(0, 0, 255, 1)">for</span> (Map&lt;String, Object&gt;<span style="color: rgba(0, 0, 0, 1)"> map : fieldslist) { Builder builder </span>=<span style="color: rgba(0, 0, 0, 1)"> Point.measurement(measurement); tags.put(</span>"instanceId", map.get("instanceId"<span style="color: rgba(0, 0, 0, 1)">).toString()); builder.time((</span><span style="color: rgba(0, 0, 255, 1)">long</span>)map.get("currentTime"<span style="color: rgba(0, 0, 0, 1)">), TimeUnit.NANOSECONDS); builder.tag(tags); builder.fields(map); batchbuilder.point(builder.build()); } System.out.println(batchbuilder.build().toString()); influxDB.write(batchbuilder.build()); }</span></pre>
View Code
package com.zjs.collection;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.context.annotation.Bean;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

/**

  • 获取微服务实例
  • @author zyg

*/
@Component
@SpringBootApplication
@EnableScheduling
public class MicServerInstanceInfoHandle {

</span><span style="color: rgba(0, 0, 255, 1)">protected</span> <span style="color: rgba(0, 0, 255, 1)">final</span> <span style="color: rgba(0, 0, 255, 1)">static</span> Logger logger = LoggerFactory.getLogger(MicServerInstanceInfoHandle.<span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">); </span><span style="color: rgba(0, 0, 255, 1)">final</span> String pathtail = "/metrics/mem.*|heap.*|threads.*|gc.*|nonheap.*|classes.*"<span style="color: rgba(0, 0, 0, 1)">; Map</span>&lt;String, String&gt;<span style="color: rgba(0, 0, 0, 1)"> tags; ThreadPoolExecutor threadpool; @Autowired DiscoveryClient dc; @Autowired RestTemplate restTemplate; </span><span style="color: rgba(0, 0, 255, 1)">final</span> <span style="color: rgba(0, 0, 255, 1)">static</span> LinkedBlockingQueue&lt;Map&lt;String, Object&gt;&gt; jsonMetrics = <span style="color: rgba(0, 0, 255, 1)">new</span> LinkedBlockingQueue&lt;&gt;(1000<span style="color: rgba(0, 0, 0, 1)">); </span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)"> * 初始化实例 可以吧相关参数设置到配置文件 </span><span style="color: rgba(0, 128, 0, 1)">*/</span> <span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> MicServerInstanceInfoHandle() { tags </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> HashMap&lt;String, String&gt;<span style="color: rgba(0, 0, 0, 1)">(); threadpool </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> ThreadPoolExecutor(4, 20, 60, TimeUnit.SECONDS, <span style="color: rgba(0, 0, 255, 1)">new</span> ArrayBlockingQueue&lt;&gt;(100<span style="color: rgba(0, 0, 0, 1)">)); } @Autowired </span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> InfluxDBConnect influxDB; </span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)"> * metrics数据获取 </span><span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)"> @Scheduled(fixedDelay </span>= 2000<span style="color: rgba(0, 0, 0, 1)">) </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> metricsDataObtain() { logger.info(</span>"开始获取metrics数据"<span style="color: rgba(0, 0, 0, 1)">); List</span>&lt;String&gt; servicelist =<span style="color: rgba(0, 0, 0, 1)"> dc.getServices(); </span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (String str : servicelist) { List</span>&lt;ServiceInstance&gt; silist =<span style="color: rgba(0, 0, 0, 1)"> dc.getInstances(str); </span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (ServiceInstance serviceInstance : silist) { threadpool.execute(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> MetricsHandle(serviceInstance)); } } } </span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)"> * 将数据插入到influxdb数据库 </span><span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)"> @Scheduled(fixedDelay </span>= 5000<span style="color: rgba(0, 0, 0, 1)">) </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> metricsDataToInfluxDB() { logger.info(</span>"开始批量将metrics数据insert-influxdb"<span style="color: rgba(0, 0, 0, 1)">); ArrayList</span>&lt;Map&lt;String, Object&gt;&gt; metricslist = <span style="color: rgba(0, 0, 255, 1)">new</span> ArrayList&lt;&gt;<span style="color: rgba(0, 0, 0, 1)">(); MicServerInstanceInfoHandle.jsonMetrics.drainTo(metricslist); </span><span style="color: rgba(0, 0, 255, 1)">if</span> (!<span style="color: rgba(0, 0, 0, 1)">metricslist.isEmpty()) { logger.info(</span>"批量插入条数:{}"<span style="color: rgba(0, 0, 0, 1)">, metricslist.size()); influxDB.batchinsert(</span>"metrics"<span style="color: rgba(0, 0, 0, 1)">, tags, metricslist); } logger.info(</span>"结束批量metrics数据insert"<span style="color: rgba(0, 0, 0, 1)">); } @Bean </span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> RestTemplate getRestTemplate() { RestTemplate restTemplate </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> RestTemplate(); SimpleClientHttpRequestFactory achrf </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> SimpleClientHttpRequestFactory(); achrf.setConnectTimeout(</span>10000<span style="color: rgba(0, 0, 0, 1)">); achrf.setReadTimeout(</span>10000<span style="color: rgba(0, 0, 0, 1)">); restTemplate.setRequestFactory(achrf); </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> restTemplate; } </span><span style="color: rgba(0, 0, 255, 1)">class</span> MetricsHandle <span style="color: rgba(0, 0, 255, 1)">extends</span><span style="color: rgba(0, 0, 0, 1)"> Thread { </span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> ServiceInstance serviceInstanc; </span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> MetricsHandle(ServiceInstance serviceInstance){ serviceInstanc</span>=<span style="color: rgba(0, 0, 0, 1)">serviceInstance; } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> run() { </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> { logger.info(</span>"获取 {}:{}:{} 应用metrics数据"<span style="color: rgba(0, 0, 0, 1)">,serviceInstanc.getServiceId(),serviceInstanc.getHost(),serviceInstanc.getPort()); @SuppressWarnings(</span>"unchecked"<span style="color: rgba(0, 0, 0, 1)">) Map</span>&lt;String, Object&gt; mapdata =<span style="color: rgba(0, 0, 0, 1)"> restTemplate .getForObject(serviceInstanc.getUri().toString() </span>+ pathtail, Map.<span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">); mapdata.put(</span>"instanceId", serviceInstanc.getServiceId() + ":" + serviceInstanc.getHost() + ":" +<span style="color: rgba(0, 0, 0, 1)"> serviceInstanc.getPort()); mapdata.put(</span>"type", "metrics"<span style="color: rgba(0, 0, 0, 1)">); mapdata.put(</span>"currentTime", System.currentTimeMillis() * 1000000<span style="color: rgba(0, 0, 0, 1)">); MicServerInstanceInfoHandle.jsonMetrics.add(mapdata); } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) { logger.error(</span>"instanceId:{},host:{},port:{},path:{},exception:{}"<span style="color: rgba(0, 0, 0, 1)">, serviceInstanc.getServiceId(), serviceInstanc.getHost(), serviceInstanc.getPort(), serviceInstanc.getUri(), e.getMessage()); } } }

}

View Code

    这里简单解释一下这句代码 final String pathtail = "/metrics/mem.*|heap.*|threads.*|gc.*|nonheap.*|classes.*"; ,metrics 这个路径下的信息很多但是我们不是都需要所以我们需要有选择的获取这样节省流量和时间。上面关键类 MicServerInstanceInfoHandle 做了一个多线程访问主要应对注册中心有成百上千个服务的时候单线程可能轮序不过来,同时做了一个队列缓冲,批量插入到 InfluxDB。

  四、结果展示

  

    

  如果你数据采集成功了就可以绘制出来上面的图形下面是对应的 sql

  

SELECT mean("rollingCountFallbackSuccess"), mean("rollingCountSuccess") FROM "testaaa" WHERE ("instanceId" = 'IP:spring-cloud-server1-test:8082' AND "type" = 'HystrixCommand') AND $timeFilter GROUP BY time($__interval) fill(null)

SELECT mean("currentPoolSize") FROM "testaaa" WHERE ("type" = 'HystrixThreadPool' AND "instanceId" = '10.10.12.51:spring-cloud-server1-test:8082') AND $timeFilter GROUP BY time($__interval) fill(null)

SELECT "heap", "heap.committed","heap.used","mem","mem.free","nonheap","nonheap.committed","nonheap.used" FROM "metrics" WHERE ("instanceId" = 'SPRING-CLOUD-SERVER1-TEST:10.10.12.51:8082') AND $timeFilter

  好了到这里就基本结束了。

  五、优化及设想

    上面的基础服务肯定都是需要高可用的,毋庸置疑都是需要学习的。如果有时间我也会向大家一一介绍,大家亦可以去搜索相关资料查看!

    可能有人问有一个叫 telegraf 的小插件直接就能收集相关数据进行聚合结果监控,

    其实我之前也是使用的 telegraf 这个小工具但是发现一个问题,

    就是每次被监控的应用重启的时候相关字段名就会变,

    因为他采集使用的是类实例的名字作为字段名,这应我们会很不方便,每次重启应用我们都要重新设置 sql 语句这样非常不友好,

    再次感觉收集数据编码难度不大所以自己就写了收集数据的代码!如果有哪位大神对 telegraf 比较了解可以解决上面我说的问题记得给我留言哦!在这里先感谢!

    有些地方是需要优化的,比如一些 IP 端口什么的都是可以放到配置文件里面的。

    还有一种想法就是我可不可以像收集性能信息一样直接应用来收集 JVM 信息让 JVM 相关信息直接发送到 MQ 当中然后再插入 InfluxDB 中

  六、总结

    从 spring boot 到现在短短的 2、3 年时间就迅速变得火爆,知识体系也变得完善,开发成本越来越低,

    所以普及程度就越来越高,微服务虽然很好但是我们也要很好的善于运用,监控就是重要的一环,

    试想一下你的机房运行着成千上万的服务,稳定运行和及时发现有问题的服务是多么重要的一件事情!

    希望以上对大家有所帮助