连忙补一下-href=-面试为啥都问Kafka-a-a (妈妈连忙补充句子)
大家好,我是哪吒。
Kafka简直是当今时代背景下数据管道的首选,无论你是做后端开发、还是大数据开发,对它或许都不生疏。开源软件Kafka的运行越来越宽泛。
面对Kafka的遍及和学习热潮,哪吒想分享一下自己多年的开发阅历,率领读者比拟轻松地把握Kafka的相关常识。
一、了解Kafka集成形式
1、什么是Kafka?
Kafka是一个高吞吐量、散布式、可水平裁减的信息传递系统,最后由LinkedIn开发。它的指标是处置海量数据的实时流式处置和传输疑问。
Kafka的外围现实是将数据转化为流,并以颁布-订阅的方式传递。
上图形容了Kafka的外围概念和数据流向。从中可以看出,消费者将信息颁布到主题,消费者订阅主题并处置信息,而主题可以分为多个分区,以支持信息的并行处置和提高可伸缩性。
2、以下是Kafka的关键概念:
二、为什么须要批处置和流处置?
批处置和流处置是Kafka的两种外围处置形式,它们在不同的运行场景中起到关键作用。了解它们的运行背景和差异有助于更好地利用Kafka的后劲。
批处置 是一种将数据按批次搜集和处置的形式。它实用于须要处置少量历史数据的义务,如报表生成、离线数据剖析、批量ETL(Extract,Transform,Load)等。
批处置通常会在固定的时时期隔内运转,处置少量数据并生成结果。它具备以下特点:
流处置 是一种实时数据处置形式,它可以延续地处置流入的数据。它实用于须要实时照应的运行,照实时监控、实时介绍、欺诈检测等。流处置使数据立刻可用,它具备以下特点:
为了充散施展Kafka的长处,咱们须要同时了解和经常使用这两种形式,依据详细需求在批处置和流处置之间切换。例如,在大少数实践运行中,数据会以流的方式进入Kafka,而后可以经过流处置工具启动实时处置,同时,历史数据也可以作为批处置义务周期性地处置。
三、Kafka主题分区战略
1、自动分区战略
Kafka自动的分区战略是Round-Robin,这象征着信息将依次调配给每个分区,确保每个分区接纳相似数量的信息。这种自动战略实用于具备相似数据量和处置需求的分区状况。在这种战略下,Kafka会轮番将信息写入每个分区,以坚持负载的平衡性。关于大少数普通性的运行场景,这种自动战略通常曾经足够了。
2、自定义分区战略
虽然自动分区战略实用于大少数状况,但有时刻你或许须要愈加灵敏的分区战略。这时,你可以经常使用自定义分区战略,依据特定需求将信息路由到不同的分区。最经常出现的状况是,你宿愿确保具备相反键(Key)的信息被写入到同一个分区,以保养信息的有序性。
自定义分区战略的示例代码如下:
publicclassCustomPartitionerimplementsPartitioner{@Overridepublicintpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster){List<PartitionInfo>partitions=cluster.partitionsForTopic(topic);intnumPartitions=partitions.size();//依据信息的键来选用分区intpartition=Math.abs(key.hashCode())%numPartitions;returnpartition;}@Overridepublicvoidclose(){//封锁资源}@Overridepublicvoidconfigure(Map<String,?>configs){//性能信息}}
自定义分区战略准许你更灵敏地控制信息的路由方式。在上述示例中,依据信息的键来选用分区,确保具备相反键的信息被写入到同一个分区,以保养它们的有序性。
3、最佳通常:如何选用分区战略
选用分区战略应该依据你的详细需求和运行场景来启动。以下是一些最佳通常倡导:
选用适当的分区战略可以协助你优化Kafka的性能和信息处置方式,确保你的运行能够以最佳方式处置信息。
四、批处置与流处置简介
1、批处置的概念
批处置 是一种数据处置方式,它依照固定的时时期隔或固定的数据量来搜集、处置和剖析数据。批处置实用于那些不须要实时照应的义务,如数据报表生成、大规模数据荡涤、离线数据剖析等。
在批处置中,数据通常存储在一个集中的位置,而后周期性地批量处置。这个处置周期可以是每天、每周或依据业务需求的其余时时期隔。批处置义务会在处置环节中消耗少量资源,由于它须要处置整个数据集。
2、流处置的概念
流处置 是一种实时数据处置方式,它能够延续地处置流入的数据。流处置实用于须要实时照应的运行,照实时监控、实时介绍、欺诈检测等。
在流处置中,数据会立刻被处置,而不须要期待批次的积攒。这使得流处置能够提供低提前的数据处置,以满足实时运行的要求。流处置通罕用于处置事情流,监控传感器数据等须要实时性的数据源。
3、批处置与流处置的区别
批处置和流处置有以下区别:
为了充散施展Kafka的长处,你须要同时了解和经常使用这两种处置形式,并依据详细需求在批处置和流处置之间切换。这将使你的运行能够以最佳方式处置不同类型的数据。
五、Kafka中的批处置
1、批处置运行场景
批处置在许多运行场景中施展着关键作用,特意是在须要处置少量历史数据的义务中。以下是一些批处置运行场景的示例:
运行场景 |
形容 |
报表生成 |
每天、每周或每月生成各种类型的报表,如开售报表、财务报表、经营剖析等。 |
离线数据剖析 |
对历史数据启动深化剖析,以发现趋向、形式和意外状况。 |
数据仓库填充 |
将数据从不同的数据源提取、转换和加载到数据仓库,以供查问和剖析。 |
大规模ETL |
将数据从一个系统转移到另一个系统,通常触及数据荡涤和转换。 |
批量图像处置 |
处置少量图像数据,例如生成缩略图、处置滤镜等。 |
2、批处置架构
典型的批处置架构包括以下组件:
组件 |
形容 |
数据源 |
|
数据处置 |
批处置义务的外围局部,包括数据的提取、转换和加载(ETL),以及任何必要的计算和剖析。 |
数据存储 |
批处置义务时期,两边数据和处置结果的存储位置,通常是相关型数据库、NoSQL数据库、散布式文件系统等。 |
结果生成 |
批处置义务的输入,通常包括生成报表、填充数据仓库等。 |
3、批处置的关键战略
(1)数据缓冲
在批处置中,处置少量数据时须要思考数据缓冲,以提高性能和有效治理数据:
(2)形态治理
形态治理关于批处置十分关键,它有助于确保义务的牢靠口头、复原和容错性:
(3)失误处置
失误处置是批处置环节中的关键局部,可以确保义务的牢靠性和数据品质:
这些战略在批处置中的综合经常使用,可以确保义务以牢靠、高效和容错的方式口头,满足性能和品质需求。依据详细的运行场景,可以依据需求调整这些战略。
4、示例:经常使用Kafka启动批处置
上方是一个繁难的示例,展示如何经常使用Kafka启动批处置。
publicclassKafkaBatchProcessor{publicstaticvoidmn(String[]args){Propertiesprops=newProperties();props.put("bootstrap.servers","localhost:9092");props.put("group.id","batch-processing-group");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("batch-data-topic"));//批处置逻辑while(true){ConsumerRecords<String,String>records=consumer.poll(Duration.ofMillis(100));for(ConsumerRecord<String,String>record:records){//处置信息processRecord(record.value());}}}privatestaticvoidprocessRecord(Stringrecord){//成功批处置逻辑System.out.println("Processingrecord:"+record);}}
在这个示例中,咱们创立了一个Kafka消费者,订阅了名为batch-data-topic的信息主题。消费者会活期拉取信息,并调用processRecord方法来处置每条信息。
这个示例展现了如何将Kafka用于批处置义务的数据源,但实践的数据处置逻辑或许愈加复杂,详细取决于运行的需求。批处置义务通常会包括数据提取、转换、处置和结果生成等步骤。
六、Kafka中的流处置
1、流处置运行场景
流处置实用于须要实时照应的运行场景,其中数据不时流入系统并须要立刻处置。以下是一些流处置运行场景的示例:
流处置运行通常须要满足低提前、高吞吐量和高可伸缩性的要求,以确保数据的及时性和品质。
2、流处置架构
流处置架构通常包括以下关键组件:
Kafka在流处置架构中罕用作数据源和数据存储,流处置框架用于处置数据流。这些组件独特单干,使流处置运行能够实时照应和剖析数据。
3、流处置的关键战略
(1)事情时期处置
事情时期处置是流处置的关键战略,特意实用于须要处置带有时期戳的事情数据的状况。事情时期示意事情出现的实践时期,而非数据抵达系统的时期。流处置运行程序须要正确处置事情时期以确保数据的时序性。这包括处置乱序事情、提前事情、重复事情等,以坚持数据的分歧性。
(2)窗口操作
窗口操作是流处置的外围概念,它准许咱们将数据宰割成不同的时期窗口,以启动聚合和剖析。经常出现的窗口类型包括滚动窗口(固定大小的窗口,随时期滚动行进)和滑动窗口(固定大小的窗口,在数据流中滑动)。窗口操作使咱们能够在不同时期尺度上对数据启动摘要和剖析,例如,每分钟、每小时、每天的数据汇总。
(3)依赖处置
流处置运行通常包括多个义务和依赖相关。治理义务之间的依赖相关十分关键,以确保数据按正确的顺序处置。依赖处置包括义务的启动和封锁顺序、数据流的拓扑排序、缺点复原等。这确保了义务之间的分歧性和正确性,尤其在散布式流处置运行中。
这些战略和关键概念独特确保了流处置运行的牢靠性、时效性和正确性。它们是构建实时数据运行的基础,关于不同的运行场景或许须要不同的调整和优化。
4、示例:经常使用KafkaStreams启动流处置
在这个示例中,咱们展示了如何经常使用KafkaStreams启动流处置。以下是示例代码的详细解释:
首先,咱们创立一个Properties对象,用于性能KafkaStreams运行程序。咱们设置了运行程序的ID和Kafka集群的地址。
Propertiesprops=newProperties();props.put(StreamsConfig.LICATION_ID_CONFIG,"stream-processing-app");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
而后,咱们创立一个StreamsBuilder对象,它将用于构建流处置拓扑。
StreamsBuilderbuilder=newStreamsBuilder();
咱们经常使用builder从名为stream-data-topic的Kafka主题中创立一个输入数据流。
KStream<String,String>source=builder.stream("stream-data-topic");
接上去,咱们对数据流口头一系列操作。首先,咱们经常使用filter操作挑选出蕴含"important-data"的信息。
source.filter((key,value)->value.contains("important-data"))
而后,咱们经常使用mapValues操作将挑选出的信息的值转换为大写。
.mapValues(value->value.toUpperCase())
最后,咱们经常使用to操作将处置后的信息发送到名为output-topic的Kafka主题。
.to("output-topic");
最后,咱们创立一个KafkaStreams对象,将builder.build()和性能属性传递给它,而后启动流处置运行程序。
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);streams.start();
这个示例展现了如何经常使用KafkaStreams轻松地构建流处置运行程序,对信息启动挑选和转换,而后将结果发送到另一个主题。这使得实时数据处置变得相对繁难,且具备高度的可伸缩性和容错性。
七、集成批处置与流处置
1、数据流整合
数据流整合是将批处置和流处置相联合的环节。它准许在处置数据时,依据数据的特性切换处置形式,从而更好地满足运行程序的需求。数据流整合可以经过经常使用不同的工具和库来成功,以便在数据处置环节中无缝切换。
2、数据转换
数据流整合通常须要启动数据转换,以确保数据可以在批处置和流处置之间无缝流转。这或许包括以下方面:
3、数据传递
将数据从批处置传递到流处置,或反之,须要适宜的数据传递机制。Kafka是一个杰出的数据传递工具,由于它可以繁难地支持数据传递。在Kafka中,批处置义务可以将数据写入特定的批处置主题,而流处置义务可以从这些主题中读取数据。这使得批处置和流处置之间的协同变得愈加容易。
4、最佳通常:批处置与流处置的协同运行
当你须要在实践运行中集成批处置与流处置时,上方是一些更详细的操作步骤和示例代码:
步骤1:依据需求选用适宜的处置形式
步骤2:数据转换和数据传递
步骤3:适宜的监控和日志
步骤4:测试和评价
示例代码
以下是一个繁难的示例,展现如何经常使用Kafka作为数据传递机制来集成批处置与流处置。假定咱们有一个批处置义务,它从文件中读取数据并将其写入Kafka主题,而后有一个流处置义务,它从同一个Kafka主题中读取数据并启动实时处置。
批处置义务(经常使用ApacheSpark):
importorg.apache.spark.SparkConf;importorg.apache.spark.api..JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.streaming.Duration;importorg.apache.spark.streaming.api.java.JavaDStream;importorg.apache.spark.streaming.api.java.JavaStreamingContext;importorg.apache.spark.streaming.kafka.KafkaUtils;importjava.util.HashMap;importjava.util.HashSet;importjava.util.Map;importjava.util.Set;publicclassBatchToStreamIntegration{publicstaticvoidmain(String[]args){SparkConfsparkConf=newSparkConf().setAppName("BatchToStreamIntegration");JavaSparkContextsparkContext=newJavaSparkContext(sparkConf);JavaStreamingContextstreamingContext=newJavaStreamingContext(sparkContext,newDuration(5000));Map<String,Integer>topicMap=newHashMap<>();topicMap.put("input-topic",1);JavaDStream<String>messages=KafkaUtils.createStream(streamingContext,"zookeeper.quorum","group",topicMap).map(consumerRecord->consumerRecord._2());messages.foreachRDD((JavaRDD<String>rdd)->{rdd.foreach(record->processRecord(record));});streamingContext.start();streamingContext.awaitTermination();}privatestaticvoidprocessRecord(Stringrecord){System.out.println("Batchprocessingrecord:"+record);}}
流处置义务(经常使用KafkaStreams):
importorg.apache.kafka.streams.KafkaStreams;importorg.apache.kafka.streams.StreamsBuilder;importorg.apache.kafka.streams.StreamsConfig;importorg.apache.kafka.streams.kstream.Consumed;importorg.apache.kafka.streams.kstream.KStream;importjava.util.Properties;publicclassStreamToBatchIntegration{publicstaticvoidmain(String[]args){Propertiesprops=newProperties();props.put(StreamsConfig.APPLICATION_ID_CONFIG,"stream-to-batch-app");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");StreamsBuilderbuilder=newStreamsBuilder();KStream<String,String>source=builder.stream("input-topic",Consumed.with(Serdes.String(),Serdes.String()));source.foreach((key,value)->{processRecord(value);});KafkaStreamsstreams=newKafkaStreams(builder.build(),props);streams.start();}privatestaticvoidprocessRecord(Stringrecord){System.out.println("Streamprocessingrecord:"+record);}}
这两个示例展示了如何经常使用不同的工具来成功批处置与流处置的集成。
kafka集群一台broker挂掉,解决办法
4台kafka服务器39挂掉了,一直起不来 ,kafka 数据日志目录 服务器器上磁盘被占超过80%,告警数据日志:$KAFKA_HOME/config/ 配置项 操作日志:$KAFKA_HOME/logs目录 在同一台机器上进行不同磁盘的负载均衡,执行 失败之后,重启kafka失败 想重新负载一下,结果失败Partitions reassignment failed due to from /export1/kafka-log/ to /export2/kafka-log/ 参考1、查看正常如下: /usr/hdf/current/kafka-broker/bin/ --broker-list .40.38:9092 --topic filebeat-dwlong-brush --time -1 filebeat-dwlong-brush:2 filebeat-dwlong-brush:1 filebeat-dwlong-brush:3 filebeat-dwlong-brush:0 每个分区offset 正常,如有消费,数值会有增加。
kafka——消费者原理解析
kafka采用发布订阅模式:一对多。发布订阅模式又分两种:
Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组 (consumer group)。 消费者用一个消费者组名标记自己。 一个发布在Topic上消息被分发给此消费者组中的一个消费者。 假如所有的消费者都在一个组中,那么这就变成了队列模型。 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。 一个消费者组中消费者订阅同一个Topic,每个消费者接受Topic的一部分分区的消息,从而实现对消费者的横向扩展,对消息进行分流。
注意:当单个消费者无法跟上数据生成的速度,就可以增加更多的消费者分担负载,每个消费者只处理部分partition的消息,从而实现单个应用程序的横向伸缩。但是不要让消费者的数量多于partition的数量,此时多余的消费者会空闲。此外,Kafka还允许多个应用程序从同一个Topic读取所有的消息,此时只要保证每个应用程序有自己的消费者组即可。
消费者组的概念就是:当有多个应用程序都需要从Kafka获取消息时,让每个app对应一个消费者组,从而使每个应用程序都能获取一个或多个Topic的全部消息;在每个消费者组中,往消费者组中添加消费者来伸缩读取能力和处理能力,消费者组中的每个消费者只处理每个Topic的一部分的消息,每个消费者对应一个线程。
在同一个群组中,无法让一个线程运行多个消费者,也无法让多线线程安全地共享一个消费者。按照规则,一个消费者使用一个线程,如果要在同一个消费者组中运行多个消费者,需要让每个消费者运行在自己的线程中。最好把消费者的逻辑封装在自己的对象中,然后使用java的ExecutorService启动多个线程,使每个消费者运行在自己的线程上,可参考
一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定哪个 partition 由哪个 consumer 来消费。
关于如何设置partition值需要考虑的因素
Kafka 有两种分配策略,一个是 RoundRobin,一个是 Range,默认为Range,当消费者组内消费者发生变化时,会触发分区分配策略(方法重新分配)。
以上三种现象会使partition的所有权在消费者之间转移,这样的行为叫作再均衡。
再均衡的优点 :
再均衡的缺点 :
RoundRobin 轮询方式将分区所有作为一个整体进行 Hash 排序,消费者组内分配分区个数最大差别为 1,是按照组来分的,可以解决多个消费者消费数据不均衡的问题。
但是,当消费者组内订阅不同主题时,可能造成消费混乱,如下图所示,Consumer0 订阅主题 A,Consumer1 订阅主题 B。
将 A、B 主题的分区排序后分配给消费者组,TopicB 分区中的数据可能 分配到 Consumer0 中。
Range 方式是按照主题来分的,不会产生轮询方式的消费混乱问题。
但是,如下图所示,Consumer0、Consumer1 同时订阅了主题 A 和 B,可能造成消息分配不对等问题,当消费者组内订阅的主题越多,分区分配可能越不均衡。
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
consumer group +topic + partition 唯一确定一个offest
Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始, consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。
你如果特别好奇,实在想看看offset什么的,也可以执行下面操作:
修改配置文件
再启动一个消费者
当消费者崩溃或者有新的消费者加入,那么就会触发再均衡(rebalance),完成再均衡后,每个消费者可能会分配到新的分区,而不是之前处理那个,为了能够继续之前的工作,消费者需要读取每个partition最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
case1:如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
case2:如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
自动提交的优点是方便,但是可能会重复处理消息
不足:broker在对提交请求作出回应之前,应用程序会一直阻塞,会限制应用程序的吞吐量。
因此,在消费者关闭之前一般会组合使用commitAsync和commitSync提交偏移量。
ConsumerRebalanceListener需要实现的两个方法
下面的例子展示如何在失去partition的所有权之前通过onPartitionRevoked()方法来提交偏移量。
Consumer有个Rebalance的特性,即重新负载均衡,该特性依赖于一个协调器来实现。每当Consumer Group中有Consumer退出或有新的Consumer加入都会触发Rebalance。
之所以要重新负载均衡,是为了将退出的Consumer所负责处理的数据再重新分配到组内的其他Consumer上进行处理。或当有新加入的Consumer时,将组内其他Consumer的负载压力,重新进均匀分配,而不会说新加入一个Consumer就闲在那。
下面就用几张图简单描述一下,各种情况触发Rebalance时,组内成员是如何与协调器进行交互的。
Tips :图中的Coordinator是协调器,而generation则类似于乐观锁中的版本号,每当成员入组成功就会更新,也是起到一个并发控制的作用。
参考:
免责声明:本文转载或采集自网络,版权归原作者所有。本网站刊发此文旨在传递更多信息,并不代表本网赞同其观点和对其真实性负责。如涉及版权、内容等问题,请联系本网,我们将在第一时间删除。同时,本网站不对所刊发内容的准确性、真实性、完整性、及时性、原创性等进行保证,请读者仅作参考,并请自行核实相关内容。对于因使用或依赖本文内容所产生的任何直接或间接损失,本网站不承担任何责任。