flink之kafka生产和消费实战-将生产数据存放到mongodb中

传统要构建一个 kafka 的生产者和消费者,还是比较费劲的,但是在引入 flink 插件后,就会变的非常容易;

我的场景:监听一个 topic, 然后消费者将该 topic 的消息存放到数据库中,展示在前端,然后在测试需要的时候在前端修改消息,然后将消息重新发送出去;因此在生产者和消费者里面加了一个字段 test, 来表示是从自己的服务这里发出去的消息,因此不需要消费并入库;

在测试生产者和消费者的时候,可以先在自己本地起一个 kafka,然后本地生产,服务消费,看代码是否 ok;  或者在服务生产,本地消费,看代码是否 ok

1. 后端是一个 springboot 工程,首先需要在 pom 文件中引入依赖


<properties>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>

<
dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>1.7.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.7.0</version> </dependency>

2. 话不多说,直接开始先写生产者

public void sendKafka(String topic, String server, String message) throws Exception {
        log.info("开始生产");
        JSONObject obj = JSONObject.parseObject(message);
        obj.put("test","test");
    StreamExecutionEnvironment env </span>=<span style="color: rgba(0, 0, 0, 1)"> StreamExecutionEnvironment.getExecutionEnvironment();
    Properties properties </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Properties();
    properties.setProperty(</span>"bootstrap.servers"<span style="color: rgba(0, 0, 0, 1)">, server);
    DataStreamSource</span>&lt;String&gt; text = env.addSource(<span style="color: rgba(0, 0, 255, 1)">new</span> MyNoParalleSource(obj.toString())).setParallelism(1<span style="color: rgba(0, 0, 0, 1)">);
    FlinkKafkaProducer</span>&lt;String&gt; producer = <span style="color: rgba(0, 0, 255, 1)">new</span> FlinkKafkaProducer(topic, <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> SimpleStringSchema(), properties);
    text.addSink(producer);
    env.execute(</span>"send kafka ok"<span style="color: rgba(0, 0, 0, 1)">);
}</span></pre>

可以看到里面用到了 MyNoParalleSource 类,其作用是构建一个并行度为 1 的数据流,来生产数据

public class MyNoParalleSource implements SourceFunction<String> {
String message;

</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> MyNoParalleSource(){

}

</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> MyNoParalleSource(String message) {
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.message =<span style="color: rgba(0, 0, 0, 1)"> message;
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> run(SourceContext&lt;String&gt; sourceContext) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
        sourceContext.collect(</span><span style="color: rgba(0, 0, 255, 1)">this</span><span style="color: rgba(0, 0, 0, 1)">.message);
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> cancel() {

}

}

此时生产者就写完了,是不是很优秀,超级简单;

3. 消费者(由于我的目的是将生产者生产的东西在消费者端存入 mongdb 数据库中,因此会比生产者稍微复杂一点)

public  void consumeKafka(String topic, String server) throws Exception {
        log.info("开始消费");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", server);
    FlinkKafkaConsumer</span>&lt;String&gt; consumer = <span style="color: rgba(0, 0, 255, 1)">new</span> FlinkKafkaConsumer&lt;&gt;(topic, <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> SimpleStringSchema(), properties);
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">从最早开始消费</span>

consumer.setStartFromLatest();

    DataStream</span>&lt;String&gt; stream =<span style="color: rgba(0, 0, 0, 1)"> env.addSource(consumer);
    DataStream</span>&lt;Tuple4&lt;String, String, String, String&gt;&gt; sourceStreamTra = stream.filter(<span style="color: rgba(0, 0, 255, 1)">new</span> FilterFunction&lt;String&gt;<span style="color: rgba(0, 0, 0, 1)">() {
        @Override
        </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">boolean</span> filter(String value) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
            Boolean flag </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
            JSONObject obj </span>=<span style="color: rgba(0, 0, 0, 1)"> JSONObject.parseObject(value);
            </span><span style="color: rgba(0, 0, 255, 1)">if</span>(obj.containsKey("test"<span style="color: rgba(0, 0, 0, 1)">)){
                flag </span>= <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">;
            }
            </span><span style="color: rgba(0, 0, 255, 1)">return</span> StringUtils.isNotBlank(value) &amp;&amp;<span style="color: rgba(0, 0, 0, 1)"> flag;
        }
    }).map(</span><span style="color: rgba(0, 0, 255, 1)">new</span> MapFunction&lt;String, Tuple4&lt;String, String, String, String&gt;&gt;<span style="color: rgba(0, 0, 0, 1)">() {
        </span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">final</span> <span style="color: rgba(0, 0, 255, 1)">long</span> serialVersionUID = 1L<span style="color: rgba(0, 0, 0, 1)">;
        @Override
        </span><span style="color: rgba(0, 0, 255, 1)">public</span> Tuple4&lt;String, String, String, String&gt;<span style="color: rgba(0, 0, 0, 1)"> map(String value)
                </span><span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
            JSONObject obj </span>=<span style="color: rgba(0, 0, 0, 1)"> JSONObject.parseObject(value);
            String dataBase </span>= <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">;
            String table </span>= <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">;
            </span><span style="color: rgba(0, 0, 255, 1)">if</span>(obj.containsKey("database"<span style="color: rgba(0, 0, 0, 1)">)){
                dataBase </span>= obj.getString("database"<span style="color: rgba(0, 0, 0, 1)">);
                table </span>= obj.getString("table"<span style="color: rgba(0, 0, 0, 1)">);
            }
            
            </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span> Tuple4&lt;String, String, String, String&gt;(server ,topic, dataBase+"-&gt;"+<span style="color: rgba(0, 0, 0, 1)">table, obj.toString());
        }
    });
    sourceStreamTra.addSink(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> MongoSink());
    env.execute();
}</span></pre>
public class MongoSink extends RichSinkFunction<Tuple4<String, String, String, String>> {
    private static final long serialVersionUID = 1L;
    MongoClient mongoClient = null;
//    MongoCollection mongoCollection = null;

    @Override
    public void invoke(Tuple4<String, String, String, String> value) throws Exception {
        KafkaRecord kafkaRecord = new KafkaRecord("", value.f0 , value.f1, value.f2, value.f3, new Date(new Timestamp(System.currentTimeMillis()).getTime()));
    </span><span style="color: rgba(0, 0, 255, 1)">if</span>(mongoClient != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">){
        mongoClient </span>=<span style="color: rgba(0, 0, 0, 1)"> MongoDBUtil.getConnect();
        MongoDatabase db </span>= mongoClient.getDatabase("databBase"); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 是自己的数据库</span>
        MongoCollection mongoCollection = db.getCollection("kafkaRecord"<span style="color: rgba(0, 0, 0, 1)">);
        mongoCollection.insertOne(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Document(CommonMethod.objectToMap(kafkaRecord)));
    }

}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> open(Configuration parms) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
    </span><span style="color: rgba(0, 0, 255, 1)">super</span><span style="color: rgba(0, 0, 0, 1)">.open(parms);
    mongoClient </span>=<span style="color: rgba(0, 0, 0, 1)"> MongoDBUtil.getConnect();
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> close() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> (mongoClient != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) {
        mongoClient.close();
    }
}

}

import lombok.Data;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;

import java.io.Serializable;
import java.util.Date;

@Data
@Document(collection = "kafkaRecord")
public class KafkaRecord implements Serializable {
@Field(
"_id")
String id;
// 具体信息
String msg;
//topic
String topic;

String server;

String source;
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">操作时间</span>

Date time;

</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> KafkaRecord(){

}

</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> KafkaRecord(String id, String server, String topic, String source, String msg, Date time){
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.id =<span style="color: rgba(0, 0, 0, 1)"> id;
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.server =<span style="color: rgba(0, 0, 0, 1)"> server;
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.msg =<span style="color: rgba(0, 0, 0, 1)"> msg;
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.topic =<span style="color: rgba(0, 0, 0, 1)"> topic;
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.source =<span style="color: rgba(0, 0, 0, 1)"> source;
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.time =<span style="color: rgba(0, 0, 0, 1)"> time;
}

}

此时消费者也完事了;

启动后端服务,生产者发送一条消息,消费者则拿到该消息存到数据库中;