C# 使用kafka 消费者-生产者订阅
一:kafka 介绍
kafka(官网地址:http://kafka.apache.org)是一种高吞吐量的分布式发布订阅的消息队列系统,具有高性能和高吞吐率。
1.1 术语介绍
- Broker
Kafka 集群包含一个或多个服务器,这种服务器被称为 broker
- Topic
主题:每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)
- Partition
分区:Partition 是物理上的概念,每个 Topic 包含一个或多个 Partition.(一般为 kafka 节点数 cpu 的总核数)
- Producer
生产者,负责发布消息到 Kafka broker
- Consumer
消费者:从 Kafka broker 读取消息的客户端。
- Consumer Group
消费者组:每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。
1.2 基本特性
可扩展性
在不需要下线的情况下进行扩容
数据流分区 (partition) 存储在多个机器上
高性能
单个 broker 就能服务上千客户端
单个 broker 每秒种读 / 写可达每秒几百兆字节
多个 brokers 组成的集群将达到非常强的吞吐能力
性能稳定,无论数据多大
Kafka 在底层摒弃了 Java 堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合 Zero-Copy 的特性极大地改善了 IO 性能。
1.3 消息格式
一个 topic 对应一种消息格式,因此消息用 topic 分类
一个 topic 代表的消息有 1 个或者多个 patition(s) 组成
一个 partition 应该存放在一到多个 server 上,如果只有一个 server,就没有冗余备份,是单机而不是集群;如果有多个 server,一个 server 为 leader(领导者),其他 servers 为 followers(跟随者),leader 需要接受读写请求,followers 仅作冗余备份,leader 出现故障,会自动选举一个 follower 作为 leader,保证服务不中断;每个 server 都可能扮演一些 partitions 的 leader 和其它 partitions 的 follower 角色,这样整个集群就会达到负载均衡的效果
消息按顺序存放;消息顺序不可变;只能追加消息,不能插入;每个消息都有一个 offset,用作消息 ID, 在一个 partition 中唯一;offset 有 consumer 保存和管理,因此读取顺序实际上是完全有 consumer 决定的,不一定是线性的;消息有超时日期,过期则删除
1.4 原理解析
producer 创建一个 topic 时,可以指定该 topic 为几个 partition(默认是 1,配置 num.partitions),然后会把 partition 分配到每个 broker 上,分配的算法是:a 个 broker,第 b 个 partition 分配到 b%a 的 broker 上,可以指定有每个 partition 有几分副本 Replication,副本的分配策略为:第 c 个副本存储在第(b+c)%a 的 broker 上。一个 partition 在每个 broker 上是一个文件夹,文件夹中文件的命名方式为:topic 名称 + 有序序号。每个 partition 中文件是一个个的 segment,segment file 由.index 和.log 文件组成。两个文件的命名规则是,上一个 segmentfile 的最后一个 offset。这样,可以快速的删除 old 文件。
producer 往 kafka 里 push 数据,会自动的 push 到所有的分区上,消息是否 push 成功有几种情况:1,接收到 partition 的 ack 就算成功,2 全部副本都写成功才算成功;数据可以存储多久,默认是两天;producer 的数据会先存到缓存中,等大小或时间达到阈值时,flush 到磁盘,consumer 只能读到磁盘中的数据。
consumer 从 kafka 里 poll 数据,poll 到一定配置大小的数据放到内存中处理。每个 group 里的 consumer 共同消费全部的消息,不同 group 里的数据不能消费同样的数据,即每个 group 消费一组数据。
consumer 的数量和 partition 的数量相等时消费的效率最高。这样,kafka 可以横向的扩充 broker 数量和 partitions;数据顺序写入磁盘;producer 和 consumer 异步
二:环境搭建(windows)
这里小编上一篇文章已经讲过如何在 windows 搭建 kafka -> https://www.cnblogs.com/IT-Ramon/p/12017745.html
三:基于.net 的常用类库
基于.net 实现 kafka 的消息队列应用,常用的类库有 kafka-net,Confluent.Kafka,官网推荐使用 Confluent.Kafka,本文也是基于该库的实现,使用版本预发行版 1.0.0-beta2,创建控制台应用程序。
四:应用–生产者
生产者将数据发布到指定的主题,一般生产环境下的负载均衡,服务代理会有多个,BootstrapServers 属性则为以逗号隔开的多个代理地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | using Confluent.Kafka; using System; namespace kafka.Producer { public class KafkaProducer { /// <summary> /// 生产者 /// </summary> public static void Produce( string message) { var config = new ProducerConfig { BootstrapServers = "localhost:9092" }; using ( var producer = new Producer<Null, string >(config)) { // 错误日志监视 producer.OnError += (_, msg) => { Console.WriteLine($ "Producer_Erro信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}" ); }; // 异步发送消息到主题 producer.BeginProduce( "MyTopic" , new Message<Null, string > { Value = message }, r => { Console.WriteLine(!r.Error.IsError ? $ "Delivered message to {r.TopicPartitionOffset}" : $ "Delivery Error: {r.Error.Reason}" ); }); // 3后 Flush到磁盘 producer.Flush(TimeSpan.FromSeconds(3)); } } } } |
控制台调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | using System; namespace kafka.Producer.App { class Program { static void Main( string [] args) { while ( true ) { var s = Console.ReadLine(); KafkaProducer.Produce(s); } } } } |
五:应用–消费者
消费者使用消费者组名称标记自己,并且发布到主题的每个记录被传递到每个订阅消费者组中的一个消费者实例。消费者实例可以在单独的进程中,也可以在不同的机器
如果所有消费者实例具有相同的消费者组,则记录将有效地在消费者实例上进行负载平衡。
如果所有消费者实例具有不同的消费者组,则每个记录将广播到所有消费者进程
上图为两个服务器 Kafka 群集,托管四个分区(P0-P3),包含两个消费者组。消费者组 A 有两个消费者实例,B 组有四个消费者实例。
默认 EnableAutoCommit 是自动提交,只要从队列取出消息,偏移量自动移到后一位,无论消息后续处理成功与否,该条消息都会消失,所以为免除处理失败的数据丢失,消费者方可设置该属性为 false,后面进行手动 commint() 提交偏移
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | using Confluent.Kafka; using System; namespace kafkaTest { public class KafkaConsumer { /// <summary> /// 消费者 /// </summary> public static void Consumer() { var conf = new ConsumerConfig { GroupId = "test-consumer-group" , BootstrapServers = "localhost:9092" , AutoOffsetReset = AutoOffsetResetType.Earliest, EnableAutoCommit = false // 设置非自动偏移,业务逻辑完成后手动处理偏移,防止数据丢失 }; using ( var consumer = new Consumer<Ignore, string >(conf)) { // 订阅topic consumer.Subscribe( "MyTopic" ); // 错误日志监视 consumer.OnError += (_, msg) => { Console.WriteLine($ "Consumer_Error信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}" ); }; while ( true ) { try { var consume = consumer.Consume(); string receiveMsg = consume.Value; Console.WriteLine($ "Consumed message '{receiveMsg}' at: '{consume.TopicPartitionOffset}'." ); // 开始我的业务逻辑 } catch (ConsumeException e) { Console.WriteLine($ "Consumer_Error occured: {e.Error.Reason}" ); } } } } } } |
控制台调用
1 2 3 4 5 6 7 8 9 10 | namespace kafkaTest { class Program { static void Main( string [] args) { KafkaConsumer.Consumer(); } } } |
效果图
常见数据问题处理
- 重复消费最常见的原因:re-balance 问题, 通常会遇到消费的数据,处理很耗时,导致超过了 Kafka 的 session
timeout 时间(0.10.x 版本默认是 30 秒),那么就会 re-balance 重平衡,此时有一定几率 offset 没提交,会导致重平衡后重复消费。
去重问题:消息可以使用唯一 id 标识 - 保证不丢失消息: 生产者(ack= -1 或 all 代表至少成功发送一次 ) 消费者
(offset 手动提交,业务逻辑成功处理后,提交 offset) - 保证不重复消费:落表(主键或者唯一索引的方式,避免重复数据)
业务逻辑处理(选择唯一主键存储到 Redis 或者 mongdb 中,先查询是否存在,若存在则不处理;若不存在,先插入 Redis 或 Mongdb, 再进行业务逻辑处理)
Kafka 可视化调试
借助可视化客户端工具 kafka tool
具体使用可参考:https://www.cnblogs.com/frankdeng/p/9452982.html