flink之kafka生产和消费实战-将生产数据存放到mongodb中
传统要构建一个 kafka 的生产者和消费者,还是比较费劲的,但是在引入 flink 插件后,就会变的非常容易;
我的场景:监听一个 topic, 然后消费者将该 topic 的消息存放到数据库中,展示在前端,然后在测试需要的时候在前端修改消息,然后将消息重新发送出去;因此在生产者和消费者里面加了一个字段 test, 来表示是从自己的服务这里发出去的消息,因此不需要消费并入库;
在测试生产者和消费者的时候,可以先在自己本地起一个 kafka,然后本地生产,服务消费,看代码是否 ok; 或者在服务生产,本地消费,看代码是否 ok
1. 后端是一个 springboot 工程,首先需要在 pom 文件中引入依赖
<properties>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>1.7.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.7.0</version> </dependency>
2. 话不多说,直接开始先写生产者
public void sendKafka(String topic, String server, String message) throws Exception { log.info("开始生产"); JSONObject obj = JSONObject.parseObject(message); obj.put("test","test");StreamExecutionEnvironment env </span>=<span style="color: rgba(0, 0, 0, 1)"> StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Properties(); properties.setProperty(</span>"bootstrap.servers"<span style="color: rgba(0, 0, 0, 1)">, server); DataStreamSource</span><String> text = env.addSource(<span style="color: rgba(0, 0, 255, 1)">new</span> MyNoParalleSource(obj.toString())).setParallelism(1<span style="color: rgba(0, 0, 0, 1)">); FlinkKafkaProducer</span><String> producer = <span style="color: rgba(0, 0, 255, 1)">new</span> FlinkKafkaProducer(topic, <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> SimpleStringSchema(), properties); text.addSink(producer); env.execute(</span>"send kafka ok"<span style="color: rgba(0, 0, 0, 1)">); }</span></pre>
可以看到里面用到了 MyNoParalleSource 类,其作用是构建一个并行度为 1 的数据流,来生产数据
public class MyNoParalleSource implements SourceFunction<String> {String message; </span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> MyNoParalleSource(){ } </span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> MyNoParalleSource(String message) { </span><span style="color: rgba(0, 0, 255, 1)">this</span>.message =<span style="color: rgba(0, 0, 0, 1)"> message; } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> run(SourceContext<String> sourceContext) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception { sourceContext.collect(</span><span style="color: rgba(0, 0, 255, 1)">this</span><span style="color: rgba(0, 0, 0, 1)">.message); } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> cancel() { }
}
此时生产者就写完了,是不是很优秀,超级简单;
3. 消费者(由于我的目的是将生产者生产的东西在消费者端存入 mongdb 数据库中,因此会比生产者稍微复杂一点)
public void consumeKafka(String topic, String server) throws Exception { log.info("开始消费"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", server);FlinkKafkaConsumer</span><String> consumer = <span style="color: rgba(0, 0, 255, 1)">new</span> FlinkKafkaConsumer<>(topic, <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> SimpleStringSchema(), properties); </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">从最早开始消费</span>
consumer.setStartFromLatest();
DataStream</span><String> stream =<span style="color: rgba(0, 0, 0, 1)"> env.addSource(consumer); DataStream</span><Tuple4<String, String, String, String>> sourceStreamTra = stream.filter(<span style="color: rgba(0, 0, 255, 1)">new</span> FilterFunction<String><span style="color: rgba(0, 0, 0, 1)">() { @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">boolean</span> filter(String value) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception { Boolean flag </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">; JSONObject obj </span>=<span style="color: rgba(0, 0, 0, 1)"> JSONObject.parseObject(value); </span><span style="color: rgba(0, 0, 255, 1)">if</span>(obj.containsKey("test"<span style="color: rgba(0, 0, 0, 1)">)){ flag </span>= <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">; } </span><span style="color: rgba(0, 0, 255, 1)">return</span> StringUtils.isNotBlank(value) &&<span style="color: rgba(0, 0, 0, 1)"> flag; } }).map(</span><span style="color: rgba(0, 0, 255, 1)">new</span> MapFunction<String, Tuple4<String, String, String, String>><span style="color: rgba(0, 0, 0, 1)">() { </span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">final</span> <span style="color: rgba(0, 0, 255, 1)">long</span> serialVersionUID = 1L<span style="color: rgba(0, 0, 0, 1)">; @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> Tuple4<String, String, String, String><span style="color: rgba(0, 0, 0, 1)"> map(String value) </span><span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception { JSONObject obj </span>=<span style="color: rgba(0, 0, 0, 1)"> JSONObject.parseObject(value); String dataBase </span>= <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">; String table </span>= <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">; </span><span style="color: rgba(0, 0, 255, 1)">if</span>(obj.containsKey("database"<span style="color: rgba(0, 0, 0, 1)">)){ dataBase </span>= obj.getString("database"<span style="color: rgba(0, 0, 0, 1)">); table </span>= obj.getString("table"<span style="color: rgba(0, 0, 0, 1)">); } </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span> Tuple4<String, String, String, String>(server ,topic, dataBase+"->"+<span style="color: rgba(0, 0, 0, 1)">table, obj.toString()); } }); sourceStreamTra.addSink(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> MongoSink()); env.execute(); }</span></pre>
public class MongoSink extends RichSinkFunction<Tuple4<String, String, String, String>> { private static final long serialVersionUID = 1L; MongoClient mongoClient = null; // MongoCollection mongoCollection = null; @Override public void invoke(Tuple4<String, String, String, String> value) throws Exception { KafkaRecord kafkaRecord = new KafkaRecord("", value.f0 , value.f1, value.f2, value.f3, new Date(new Timestamp(System.currentTimeMillis()).getTime()));</span><span style="color: rgba(0, 0, 255, 1)">if</span>(mongoClient != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">){ mongoClient </span>=<span style="color: rgba(0, 0, 0, 1)"> MongoDBUtil.getConnect(); MongoDatabase db </span>= mongoClient.getDatabase("databBase"); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 是自己的数据库</span> MongoCollection mongoCollection = db.getCollection("kafkaRecord"<span style="color: rgba(0, 0, 0, 1)">); mongoCollection.insertOne(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Document(CommonMethod.objectToMap(kafkaRecord))); } } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> open(Configuration parms) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception { </span><span style="color: rgba(0, 0, 255, 1)">super</span><span style="color: rgba(0, 0, 0, 1)">.open(parms); mongoClient </span>=<span style="color: rgba(0, 0, 0, 1)"> MongoDBUtil.getConnect(); } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> close() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception { </span><span style="color: rgba(0, 0, 255, 1)">if</span> (mongoClient != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) { mongoClient.close(); } }
}
import lombok.Data; import org.springframework.data.mongodb.core.mapping.Document; import org.springframework.data.mongodb.core.mapping.Field;import java.io.Serializable;
import java.util.Date;@Data
@Document(collection = "kafkaRecord")
public class KafkaRecord implements Serializable {
@Field("_id")
String id;
// 具体信息
String msg;
//topic
String topic;String server; String source; </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">操作时间</span>
Date time;
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> KafkaRecord(){ } </span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> KafkaRecord(String id, String server, String topic, String source, String msg, Date time){ </span><span style="color: rgba(0, 0, 255, 1)">this</span>.id =<span style="color: rgba(0, 0, 0, 1)"> id; </span><span style="color: rgba(0, 0, 255, 1)">this</span>.server =<span style="color: rgba(0, 0, 0, 1)"> server; </span><span style="color: rgba(0, 0, 255, 1)">this</span>.msg =<span style="color: rgba(0, 0, 0, 1)"> msg; </span><span style="color: rgba(0, 0, 255, 1)">this</span>.topic =<span style="color: rgba(0, 0, 0, 1)"> topic; </span><span style="color: rgba(0, 0, 255, 1)">this</span>.source =<span style="color: rgba(0, 0, 0, 1)"> source; </span><span style="color: rgba(0, 0, 255, 1)">this</span>.time =<span style="color: rgba(0, 0, 0, 1)"> time; }
}
此时消费者也完事了;
启动后端服务,生产者发送一条消息,消费者则拿到该消息存到数据库中;