手机微网站怎么做的,湘潭企业网站建设 磐石网络,wordpress 页面评论,免费的ppt制作软件分布式消息队列kafka【二】—— 基础概念介绍和快速入门 文章目录分布式消息队列kafka【二】—— 基础概念介绍和快速入门Kafka介绍与高性能原因分析Kafka介绍Kafka有哪些特点Kafka高性能的原因是什么#xff1f;Kafka高性能核心pageCache与zeroCopy原理解析PageCache传统应用…分布式消息队列kafka【二】—— 基础概念介绍和快速入门文章目录分布式消息队列kafka【二】—— 基础概念介绍和快速入门Kafka介绍与高性能原因分析Kafka介绍Kafka有哪些特点Kafka高性能的原因是什么Kafka高性能核心pageCache与zeroCopy原理解析PageCache传统应用程序读取磁盘文件和返回给消费者的过程zeroCopy零拷贝Kafka(MQ)实战应用场景剖析Kafka(MQ)的应用场景Kafka(MQ)之异步化实战Kafka(MQ)之服务解耦、削峰填谷Kafka海量日志收集Kafka(MQ)之数据同步实战Kafka之实时计算分析kafka基础概念kafka集群架构topic与partition关系副本概念replicaIn Sync Replicas高水位线kafka快速入门生产者编码操作步骤消息实体后续复用topic常量生产者消费者编码操作步骤消费者Kafka介绍与高性能原因分析Kafka介绍Kafka是LinkedIn开源的分布式消息系统目前归属于Apache顶级项目Kafka主要特点是基于Pull的模式来处理消息消费追求高吞吐量一开始的目的就是用于日志收集和传输0.8版本开始支持复制不支持事务目的追求高吞吐量。对消息的重复、丢失、错误没有严格要求适合产生大量数据的互联网服务的数据收集业务Kafka可以做到消息的可靠性比如一条不丢失但这样对Kafka的性能有一定的降低正常情况即然选择了Kafka就会容忍它极少的数据丢失比如十几亿的日志收集丢了两三条问题不大Kafka有哪些特点具有分布式特性支持消息分区概念核心概念partition(分区)一个topic(主题)下可以有多个partitionpartition和consumer(消费者)是一一对应的每个topic的某一个partition只能被同一个消费组下的其中一个consumer消费同组的consumer则起到均衡效果。因此我们可以说分区是消费并行度的基本单位。从consumer的角度讲我们订阅消费了一个topic也就订阅了该topic的所有partition。当消费者数量多于partition的数量时多余的消费者空闲。多个消费者组就是浪费的无意义的。组与组之间的消息是否被消费是相互隔离互不影响的跨平台支持不同语言的客户端比如java、php、python等对于异构的系统使用友好实时性数据支持实时处理和一键处理即使Kafka数据堆积上亿只要存储OK不会影响Kafka性能不会影响Kafka消息的接收和发送性能伸缩性支持水平扩展Kafka高性能的原因是什么顺序写PageCache空中接力高效读写顺序写就是将消息不断追加写入本地磁盘随机写是指可以在任何时候将存取文件的指针指向文件内容的任何位置。顺序写的性能是随机写的万倍。PageCache详解见下高性能、高吞吐初衷日志收集。后台异步主动Flush好多异步级别的scheduler将连续的小块组成一个大块的物理文件。文件按顺序排好减少磁盘移动时间充分利用空闲的内存。预读策略IO调度zeroCopyKafka高性能核心pageCache与zeroCopy原理解析PageCachePageCache是系统级别的缓存它把尽可能多的空闲内存当作磁盘缓存使用来进一步提高IO效率同时当其他进程申请内存回收PageCache的代价也很小。当上层有写操作时操作系统只是将数据写入PageCache同时标记Page属性为Dirty。当读操作发生时先从PageCache中查找如果发生缺页才进行磁盘调度最终返回需要的数据。PageCache同时可以避免在JVM内部缓存数据避免不必要的GC、以及内存空间占用。对于In-Process Cache如果Kafka重启它会失效而操作系统管理的PageCache依然可以继续使用。对应到Kafka生产和消费消息中producer把消息发到broker后数据并不是直接落入磁盘的而是先进入PageCache。PageCache中的数据会被内核中的处理线程采用同步或异步的方式写回到磁盘。Consumer消费消息时会先从PageCache获取消息获取不到才回去磁盘读取并且会预读出一些相邻的块放入PageCache以方便下一次读取。如果Kafka producer的生产速率与consumer的消费速率相差不大那么几乎只靠对broker PageCache的读写就能完成整个生产和消费过程磁盘访问非常少。传统应用程序读取磁盘文件和返回给消费者的过程zeroCopy零拷贝磁盘文件只在内核空间上下文copy一次到内核读取缓冲区然后直接写到网卡给消费者。不同消费者可以使用同一个内核读取缓冲区的磁盘文件数据避免重复复制。Kafka(MQ)实战应用场景剖析Kafka(MQ)的应用场景Kafka之异步化、服务解耦、削峰填谷Kafka海量日志收集Kafka之数据同步应用Kafka之实时计算分析Kafka(MQ)之异步化实战如图假设线上服务出现BUG有一个error日志告警此时需要把这个error日志以短信、邮件、第三方推送等方式通知用户如果都集中到单台服务器压力过大。我们可以把这些推送封装为一个个请求推送代理服务是一个生产者的多集群形式需要持久化的请求持久化存储将请求按照类型划分发送到具体处理的某个集群比如是邮件consumer服务集群推送请求经过某个推送代理服务集群生产一条消息发送到邮件的topic中邮件consumer服务集群的接收到topic中的消息处理请求并发送邮件比如某段时间同时生产30个请求邮件consumer服务集群有三个节点可以做负载均衡每个节点处理10个请求。从而实现了异步化的处理还启到了服务解耦的作用。Kafka(MQ)之服务解耦、削峰填谷如图实现了订单系统和物流系统的服务解耦而且比如双十一订单量极具增加的情况下用户更新订单状态后发送到MQ可以通过延迟消费将处理不过来的消息延迟到并发量小的时间段消费从而实现削峰填谷的作用。Kafka海量日志收集如图图中左边的Log4j2 Appender是应用服务集群会产生一个全量日志app.log和错误日志error.log。然后通过filebeat日志抓取插件抓取这两个日志文件推送到kafka中。如果只推送全量日志这就无法保证error日志的即时处理同时kafka的topic也要分开。kafka的数据再流入下游logstash中对日志进行解析操作解析完的json数据再存储到es中。这就是一套比较完整的kafka海量日志收集系统。Kafka(MQ)之数据同步实战如图比如订单的创建订单的流量非常大时我们持久化到数据库层肯定需要一些分库分表的策略。怎么去做整体维度的统计分析和查询连接这些数据库并行的查询然后做结果的汇总吗肯定不是这么去做的我们需要把这些数据库的所有表的数据统一到一个地方进行查询。目前市面上比较主流的是同步到es、HBase、redis等。图中cannal是解析Mysql Binlog实时同步的中间件。Mysql Binlog怎么解析需要开启Mysql Binlog指定按照一行行的读取读取之后发送到MQ。然后由consumer消费一条条的Binlog数据解析Binlog数据同步到es上。这就是一套比较完整的kafka数据同步系统。另外还有的公司采用这种方式实现数据同步有些数据同步是直接在订单创建的时候先入库后再发到MQMQ再将数据同步到es中。这种方式其实是不可靠的无法保证入库后再发到MQ的原子性这种就需要实现像RabbitMQ的可靠性投递。最可靠的是已经落库的数据从这个源头去同步更为简单。比如图中这种方式。Kafka之实时计算分析如图数据采集一些application应用发送到kafkakafka直接对接Flink实时计算平台Flink对数据进行一些统计分析、聚合然后再把数据输出到Destination。kafka基础概念kafka集群架构中间Kafka broker代表三个Kafka节点奇数个稳定。Producer会将消息发送到Kafka集群Consumer会拉取Kafka集群消息到本地处理。Kafka集群的维护通过元数据管理中心Zookeeper实现。Kafka集群大部分都是内存级别的存储如果Producer的生产速率与Consumer的消费速率相差不大甚至完全不需要磁盘此时磁盘只做备份的作用replicate表示数据同步是基于内存级别的假设此时有三个节点相同的一份消息在内存里存了三份当某一两个节点宕机不会有数据丢失但是三个节点都宕机会有部分数据丢失当然kafka也有相关的配置保证数据不丢失但是会影响性能。topic与partition关系Topic主题逻辑概念Partition分区可以简单的理解为物理概念在存储层面上可以理解为一个个可以追加的日志文件。如图TopicA由1、2、3、4四个分区注意一个分区只能属于单个主题同一个主题下可以由多个分区。消息被追加到分区里都会临时分配偏移量offset。偏移量offset在分区中是一个唯一的标识。对于这个偏移量offset的追加方式是顺序写的过程。Kafka可以保证分区是有序的不是主题是有序的。怎么保证消息的有序只需要保证一个消费者/一个线程去连接一个分区即可。每一条消息在发送到kafka broker之前会根据一个分区的规则分区器通过这个规则可能是路由规则、hash规则等把消息变成0、1、2、3这样的分区序号然后把消息打到指定的分区上。如果某个主题上只有一个分区即一个磁盘文件那所有操作的IO都会产生在这一个磁盘文件上会产生资源的瓶颈。分区的目的就是为了分散磁盘的IO。创建主题时可以指定分区数当然后续也可以修改某个主题的分区数。通过增加分区可以更好的实现水平扩展性。副本概念replicaBroker代表实际的物理机器节点绿色的P代表leader副本紫色的P代表follower副本。可以通过增加follower副本的数量提升集群的容灾能力比如这里Broker3宕机还可以从Borker1和2拼接1、2、3、4四个副本保证数据不丢失。每个partition可以有多个副本。leader副本每个partition只能有一个leader副本所有的producer的请求都会发送到这个leader 副本当然consumer可以从leader上consume数据也可以从follower replica中consume数据。follower副本partition中除了leader副本之外的副本就称之为follower副本follower副本的数目可以自由配置不像leader只能有一个。它的主要任务就是保持和leader同步当旧的leader副本出现问题的时候它能够快速被晋级为leader副本从而保证高可用性。我们可以从follower副本中读也可以只从leader读这个是可以配置的。需要注意的是同一时刻leader副本和follower副本的数据并非完全一致之间存在主从概念会有数据同步的过程。绿色的leader副本可以处理读写请求紫色的follower副本只负责去实时拉取绿色的leader副本的数据进行数据同步。In Sync ReplicasARAssigned Replicas分区中的所有副本统称为ARISRIn Sync Replicas所有与leader副本保持一定程度同步的副本包括leader副本在内组成ISROSROut Sync Relipcas与leader副本同步滞后过多的副本不包括leader副本组成OSR由此可见ARISROSR。在正常情况下所有的follower副本都应该与leader副本保持一定程度的同步即ARISR,OSR集合为空。图中绿色的P1代表leader副本紫色的P1 S1和P1 S2代表follower副本当外部请求写入消息到P1 leader副本后follower副本S1和S2还需要拉取P1数据并写入。leader副本主要维护和跟踪ISR集合里所有follower副本的滞后状态。当OSR集合中的follower副本同步的进度跟上了会把OSR集合中的follower副本写回ISR集合。当leader副本所在的节点宕机只会从ISR集合中选取follower副本成为leader副本。高水位线HWHigh Watermark高水位线消费者只能最多拉取到高水位线的消息LEOLog End Offset日志文件的最后一条记录的offset偏移量ISR集合与HW和LEO存在着密不可分的关系如图HW在6的位置故消费者最多可消费的消息区间是在0-5之间不能消费6-8的数据LEO代表最新的日志文件写入在9的位置。HW高水位线代表消费者只能最多拉取到高水位线的消息。leader副本在写入消息后follow副本还没有成功拉取消息所以此时消息还无法消费。follow1同步数据很快follow2同步3消息很快但是同步4消息很慢此时HW会一直在3这个位置上。当然这个可以通过选择策略改变可以选择一条消息发送出去后所有副本必须都同步成功才能返回结果才能拉取这个offset消息消费。也可以选择半数以上同步成功即可拉取这个消息消费。kafka快速入门生产者编码操作步骤配置生产者参数属性和创建生产者对象构建消息ProducerRecord发送消息send关闭生产者消息实体后续复用packagecom.bfxy.kafka.api.model;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importjava.io.Serializable;DataAllArgsConstructorNoArgsConstructorpublicclassUser/*implements Serializable*/{// 不需要实现Serializable接口因为kafka对VALUE已经做了序列化privateStringid;privateStringname;}topic常量packagecom.bfxy.kafka.api.constant;publicinterfaceConst{StringTOPIC_QUICKSTARTtopic_quickstart;}生产者packagecom.bfxy.kafka.api.quickstart;importcom.alibaba.fastjson.JSON;importcom.bfxy.kafka.api.constant.Const;importcom.bfxy.kafka.api.model.User;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;publicclassQuickStartProducer{publicstaticvoidmain(String[]args){// 1.配置生产者启动的关键属性参数PropertiespropertiesnewProperties();// 1.1.连接kafka集群的服务列表如果有多个使用”逗号“隔开properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.218.21:9092);// 1.2.CLIENT_ID_CONFIG这个属性的目的是标记kafkaClient的IDproperties.put(ProducerConfig.CLIENT_ID_CONFIG,quickstart-producer);// 1.3.对kafka的key以及value做序列化KEY_SERIALIZER_CLASS_CONFIG和VALUE_SERIALIZER_CLASS_CONFIG// Q为什么需要对kafka的key以及value做序列化// A因为Kafka Broker在接收消息的时候必须要以二进制的方式接收所以必须要对key以及value做序列化// 字符串序列化类可以直接用后面这串字符串org.apache.kafka.common.serialization.StringSerializer// KEY是kafka用于做消息投递计算具体投递到对应主题的哪一个分区而需要的properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// VALUE实际发送消息的内容properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// try的括号种进行创建资源连接的写法的作用在try语句结束后自动释放前提是这些可关闭的资源必须实现 java.lang.AutoCloseable 接口。此时就不用再finally中进行资源的释放了。// 2.创建kafka生产者对象传递properties属性参数集合try(KafkaProducerString,StringproducernewKafkaProducer(properties)){for(inti0;i10;i){// 3.构造消息内容UserusernewUser(00i,张三);// arg1topicarg2实际的消息体内容ProducerRecordString,StringrecordnewProducerRecord(Const.TOPIC_QUICKSTART,JSON.toJSONString(user));// 4.发送消息producer.send(record);}}catch(Exceptione){e.printStackTrace();}}}消费者编码操作步骤配置消费者参数属性和构造消费者对象订阅主题拉取消息并进行消费处理提交消费偏移量关闭消费者消费者packagecom.bfxy.kafka.api.quickstart;importcom.bfxy.kafka.api.constant.Const;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.Collections;importjava.util.List;importjava.util.Properties;publicclassQuickStartConsumer{publicstaticvoidmain(String[]args){// 1.配置属性参数PropertiespropertiesnewProperties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.218.21:9092);// 字符串反序列化类可以直接用后面这串字符串org.apache.kafka.common.serialization.StringDeserializerproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 非常重要的属性配置与消费者订阅组有关系properties.put(ConsumerConfig.GROUP_ID_CONFIG,quickstart-group);// 常规属性会话连接超时时间默认是10000properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10000);// 消费者提交offset自动提交手动提交默认是自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);// 自动提交的提交周期默认5000properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,5000);// 2.创建消费者对象try(KafkaConsumerString,StringconsumernewKafkaConsumer(properties)){// 3.订阅感兴趣的主题consumer.subscribe(Collections.singletonList(Const.TOPIC_QUICKSTART));System.out.println(quickstart consumer started...);// 4.采用拉取消息的方式消费数据while(true){// 设置等待多久拉取一次消息ConsumerRecordsString,Stringrecordsconsumer.poll(Duration.ofMillis(1000));// 拉取Const.TOPIC_QUICKSTART主题里面所有的消息// topic和partition是一对多的关系一个topic可以有多个partition// 因为消息是在partition里存储的所以需要遍历partition集合for(TopicPartitionpartition:records.partitions()){// 通过TopicPartition获取指定的消息集合获取到的就是当前TopicPartition里面所有的消息ListConsumerRecordString,StringpartitionRecordListrecords.records(partition);// 获取当前TopicPartition对应的主题名称Stringtopicpartition.topic();// 获取当前TopicPartition下的消息条数intsizepartitionRecordList.size();System.out.println(String.format(--- 获取topic%s分区位置%s消息总数%s ---,topic,partition.partition(),size));for(ConsumerRecordString,StringconsumerRecord:partitionRecordList){// 实际的消息内容StringvalueconsumerRecord.value();// 当前获取的消息偏移量longoffsetconsumerRecord.offset();// 提交的消息偏移量如果要提交的话必须提交当前消息的offset1表示下一次从什么位置offset拉取消息longcommitOffsetoffset1;System.out.println(String.format(-- 获取实际消息value%s消息offset%s提交offset%s ---,value,offset,commitOffset));}}}}catch(Exceptione){e.printStackTrace();}}}