05-spark streaming & kafka

1、如何消费已经被消费过的数据?

    答:采用不同的 group

2、如何自定义去消费已经消费过的数据?

    Conosumer.properties 配置文件中有两个重要参数

    auto.commit.enable:如果为 true,则 consumer 的消费偏移 offset 会被记录到 zookeeper。下次 consumer 启动时会从此位置继续消费

    auto.offset.reset 该参数只接受两个常量 largest 和 Smallest, 分别表示将当前 offset 指到日志文件的最开始位置和最近的位置。

    实现自定义消费进度还是挺复杂的!这里略,知道有上面两个参数就行

3、kafka partition 和 consumer 数目关系

    1) 如果 consumer 比 partition 多,是浪费,因为 kafka 的设计是在一个 partition 上是不允许并发的,所以 consumer 数不要大于 partition 数 。
    2) 如果 consumer 比 partition 少,一个 consumer 会对应于多个 partitions,这里主要合理分配 consumer 数和 partition 数,否则会导致 partition 里面的数据被取的不均匀
     最好 partiton 数目是 consumer 数目的整数倍,所以 partition 数目很重要,比如取 24,就很容易设定 consumer 数目 -->12 。

    3) 如果 consumer 从多个 partition 读到数据,不保证数据间的顺序性,

     kafka 只保证在一个 partition 上数据是有序的,但多个 partition,根据你读的顺序会有不同

    4) 增减 consumer,broker,partition 会导致 rebalance,所以 rebalance 后 consumer 对应的 partition 会发生变化

4、kafka topic 副本分配?

    Kafka 尽量将所有的 Partition 均匀分配到整个集群上。一个典型的部署方式是一个 Topic 的 Partition 数量大于 Broker 的数量

    1) 如何分配副本:

      Producer 在发布消息到某个 Partition 时,先通过 ZooKeeper 找到该 Partition 的 Leader,然后无论该 Topic 的 Replication Factor 为多少(也即该 Partition 有多少个 Replica(副本)),
      Producer 只将该消息发送到该 Partition 的 Leader。Leader 会将该消息写入其分区目录下的 Log 中。每个 Follower 都从 Leader pull 数据。

    2)Kafka 分配 Replica 的算法如下:

     将所有 Broker(假设共 n 个 Broker)和待分配的 Partition 排序
     将第 i 个 Partition 分配到第(i mod n)个 Broker 上
     将第 i 个 Partition 的第 j 个 Replica 分配到第((i + j) mode n)个 Broker 上

    注:Follower 只提供读服务,可以供消费者读。但是 Fllower 不提供写服务,leader 供生产者写。

5、kafka 如何设置生存周期与清理数据?

    server.properties 找相关配置

6、kafka direct 是什么? 为什么要用这个,有什么优点?和其他的有什么区别。

  1)数据不丢失,数据一定会被处理

      Direct 的方式是会直接操作 kafka 底层的元数据信息,这样如果计算失败了,可以把数据重新读一下,重新处理。
      即数据一定会被处理。拉数据,是 RDD 在执行的时候直接去拉数据。(如果不开启 wal,Receiver 数据可能会发生丢失,所以有些数据不会被处理)

  2)RDD 和 kafka 的 patition 是一对一

      由于底层是直接读数据,没有所谓的 Receiver,RDD 的 partition 和 kafka 的 partition 是一致的。而 Receiver 的方式,这 2 个 partition 是没任何关系的。
      所以读数据的地方,处理数据的地方和驱动数据处理的程序都在同样的机器上,可以边读边写,这样就可以极大的提高性能。

      不足之处是由于 RDD 和 kafka 的 patition 是一对一的,想提高并行度就会比较麻烦。提高并行度还是 repartition,即重新分区,因为产生 shuffle,很耗时。

  3)不需要开启 wal 机制

      减少了写文件,极大的提升了效率,还至少能节省一倍的磁盘空间

7、Spark-Streaming 获取 kafka 数据的两种方式, 并简要介绍他们的优缺点?

  Receiver 方式:

      当一个任务从 driver 发送到 executor 执行的时候,这时候,将数据拉取到 executor 中做操作,但是如果数据太大的话,这时候不能全放在内存中,
      receiver 通过 WAL,设置了本地存储,他会存放本地,保证数据不丢失,然后使用 Kafka 高级 API 通过 zk 来维护偏移量,保证数据的衔接性,
      其实可以说,receiver 数据在 zk 获取的,这种方式效率低,而且极易容易出现数据丢失

  Direct 方式:

      他使用 Kafka 底层 Api 并且消费者直接连接 kafka 的分区上,因为 createDirectStream 创建的 DirectKafkaInputDStream 每个 batch 所对应的 RDD 的分区与 kafka 分区一一对应,
      但是需要自己维护偏移量,迭代计算,即用即取即丢,不会给内存造成太大的压力,这样效率很高

8、 kafka 在高并发的情况下, 如何避免消息丢失和消息重复?

  消息丢失解决方案:

      首先对 kafka 进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后 Kafka 设置 acks=-1,即需要相应的所有处于 ISR 的分区都确认收到该消息后,才算发送成功

  消息重复解决方案:

      1、消息可以使用唯一 id 标识
      2、生产者(acks=1 代表至少成功发送一次 )
      3、消费者 (offset 手动提交,业务逻辑成功处理后,提交 offset)
      4、落表(主键或者唯一索引的方式,避免重复数据)
      5、业务逻辑处理(选择唯一主键存储到 Redis 或者 mongdb 中,先查询是否存在,若存在则不处理;若不存在,先插入 Redis 或 Mongdb, 再进行业务逻辑处理)

9、kafka 怎么保证数据消费一次且仅消费一次

    幂等 producer:保证发送单个分区的消息只会发送一次,不会出现重复消息

    事务 (transaction):保证原子性地写入到多个分区,即写入到多个分区的消息要么全部成功,要么全部回滚

    流处理 EOS:流处理本质上可看成是“读取 - 处理 - 写入”的管道。此 EOS 保证整个过程的操作是原子性。注意,这只适用于 Kafka Streams

10、kafka 怎么保证数据的一致性和可靠性

    可以通过 acks 参数设置数据可靠性的级别

    0: 不论写入是否成功,server 不需要给 Producer 发送 Response,如果发生异常,server 会终止连接,触发 Producer 更新 meta 数据;
    1: Leader 写入成功后即发送 Response,此种情况如果 Leader fail,会丢失数据
    -1: 等待所有 ISR 接收到消息后再给 Producer 发送 Response,这是最强保证

11、kafka 到 spark streaming 怎么保证数据完整性,怎么保证数据不重复消费?

    Receiver 方式

      开启 WAL(预写日志),将从 kafka 中接受到的数据写入到日志文件中,所有数据从失败中可恢复。
    Direct 方式

      依靠 checkpoint 机制来保证。(记录消费者组的偏移量)

    重复消费:

      幂等,事务

12、kafka 的消费者高阶和低阶 API 有什么区别?

    kafka 提供了两套 consumer API:The high-level Consumer API 和 The SimpleConsumer API

    high-level consumer API :提供了一个从 kafka 消费数据的高层抽象,

    SimpleConsumer API :则需要开发人员更多地关注细节。

13、kafka 的 exactly-once

    幂等 producer:保证发送单个分区的消息只会发送一次,不会出现重复消息
    事务 (transaction):保证原子性地写入到多个分区,即写入到多个分区的消息要么全部成功,要么全部回滚
    流处理 EOS:流处理本质上可看成是“读取 - 处理 - 写入”的管道。此 EOS 保证整个过程的操作是原子性。注意,这只适用于 Kafka Streams

14、如何保证从 Kafka 获取数据不丢失?

    1. 生产者数据的不丢失

        kafka 的 ack 机制:在 kafka 发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到。

    2. 消费者数据的不丢失

        通过 offset commit 来保证数据的不丢失,kafka 自己记录了每次消费的 offset 数值,下次继续消费的时候,接着上次的 offset 进行消费即可。

15、 spark 实时作业宕掉,kafka 指定的 topic 数据堆积怎么办?

    应对措施:

      ①spark.streaming.concurrentJobs=10:提高 Job 并发数,从源码中可以察觉到,这个参数其实是指定了一个线程池的核心线程数而已,没有指定时,默认为 1。

      ②spark.streaming.kafka.maxRatePerPartition=2000:设置每秒每个分区最大获取日志数,控制处理数据量,保证数据均匀处理。

      ③spark.streaming.kafka.maxRetries=50:获取 topic 分区 leaders 及其最新 offsets 时,调大重试次数。

      ④在应用级别配置重试

        spark.yarn.maxAppAttempts=5
        # 尝试失败有效间隔时间设置
        spark.yarn.am.attemptFailuresValidityInterval=1h

16、produce 向 kafka 中发送数据产生的 offset 怎么算(给你传入几条大小的消息 求 offset 是多少)

    什么是 offset

      offset 是 consumer position,Topic 的每个 Partition 都有各自的 offset.
      消费者需要自己保留一个 offset,从 kafka 获取消息时,只拉去当前 offset 以后的消息。
      Kafka 的 scala/java 版的 client 已经实现了这部分的逻辑,将 offset 保存到 zookeeper 上

    1) auto.offset.reset

      如果 Kafka 没有开启 Consumer,只有 Producer 生产了数据到 Kafka 中,此后开启 Consumer。
      在这种场景下,将 auto.offset.reset 设置为 largest(最近),那么 Consumer 会读取不到之前 Produce 的消息,只有新 Produce 的消息才会被 Consumer 消费

    2) auto.commit.enable(例如 true,表示 offset 自动提交到 Zookeeper)

    3) auto.commit.interval.ms(例如 60000, 每隔 1 分钟 offset 提交到 Zookeeper)

    4) offsets.storage

      Select where offsets should be stored (zookeeper or kafka). 默认是 Zookeeper

    5) 基于 offset 的重复读

    6) Kafka 的可靠性保证 (消息消费和 Offset 提交的时机决定了 At most once 和 At least once 语义)

    Kafka 默认实现了 At least once 语义(数据一定会被处理,但可能重复消费!)