Spark Streaming实时流处理项目实战(四)分布式消息队列Kafka

2022年7月4日13:17:48

kafka概述

Kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。

kafka架构及核心概念

Spark Streaming实时流处理项目实战(四)分布式消息队列Kafka
1)Producer :消息生产者,就是向kafka broker发消息的客户端;
2)Consumer :消息消费者,向kafka broker取消息的客户端;
3)Consumer Group (CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
4)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
5)Topic :可以理解为一个队列,生产者和消费者面向的都是一个topic
6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列;
7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。
8)leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。
9)follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的follower。

kafka部署及使用

安装之前需要事先安装zookeeper集群,因为我之前安装CM已经装了zookeeper,这里就不在安装了

1. 集群模式部署

1)上传安装包并解压

[root@hadoop103 software]# tar -zxvf kafka_2.11-0.11.0.2.tgz -C /opt/module/

2)修改解压后的文件名称

[root@hadoop103 software]# cd /opt/module/[root@hadoop103 module]# ll
total 0
drwxr-xr-x 6 root root 89 Nov 11  2017 kafka_2.11-0.11.0.2[root@hadoop103 module]# mv kafka_2.11-0.11.0.2 kafka-0.11.0.2[root@hadoop103 module]#

3)在/opt/module/kafka目录下创建logs文件夹和data文件夹,做到日志和数据目录分离开

[root@hadoop103 kafka-0.11.0.2]# mkdir logs[root@hadoop103 kafka-0.11.0.2]# mkdir data

4)修改配置文件

[root@hadoop103 kafka-0.11.0.2]# cd config[root@hadoop103 config]# vim server.properties

输入以下内容:

#broker的全局唯一编号,不能重复
broker.id=0#删除topic功能使能
delete.topic.enable=true
auto.create.topics.enable = false#处理网络请求的线程数量
num.network.threads=3#用来处理磁盘IO的现成数量
num.io.threads=8#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400#请求套接字的缓冲区大小
socket.request.max.bytes=104857600#kafka数据存放的路径,日志会默认放在logs路径下
log.dirs=/opt/module/kafka-0.11.0.2/data#topic在当前broker上的默认分区个数
num.partitions=1#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1#segment文件保留的最长时间,超时将被删除
log.retention.hours=168#配置连接Zookeeper集群地址
zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181

5)配置环境变量

[root@hadoop103 config]# vim /etc/profile#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka-0.11.0.2
export PATH=$PATH:$KAFKA_HOME/bin
[root@hadoop103 config]# source /etc/profile[root@hadoop103 config]# echo $KAFKA_HOME/opt/module/kafka-0.11.0.2[root@hadoop103 config]#

Spark Streaming实时流处理项目实战(四)分布式消息队列Kafka

6)分发安装包

[root@hadoop103 module]# cpush slave: kafka-0.11.0.2 /opt/module/[root@hadoop103 module]# cexec slave: 'ls /opt/module/'[root@hadoop103 module]# cpush slave: /etc/profile /etc/

Spark Streaming实时流处理项目实战(四)分布式消息队列Kafka

注意:如果没有分发环境变量配置文件,需要各自配置其他机器的环境变量

7)分别在hadoop104和hadoop105上修改配置文件/opt/module/kafka-0.11.0.2/config/server.properties中的broker.id=1broker.id=2

注:broker.id不得重复

Spark Streaming实时流处理项目实战(四)分布式消息队列Kafka

8)启动集群
启动kfaka之前要先启动zookeeper集群(因为我这里已经启动了,所以不需要重新启动zookeeper服务)

#zookeeper启动命令
bin/zkServer.shstart

然后依次在hadoop103、hadoop104、hadoop105节点上启动kafka,并使用jps/jps -m 查看进程

[root@hadoop103 kafka-0.11.0.2]# bin/kafka-server-start.sh -daemon config/server.properties[root@hadoop104 kafka-0.11.0.2]# bin/kafka-server-start.sh -daemon config/server.properties[root@hadoop105 kafka-0.11.0.2]# bin/kafka-server-start.sh -daemon config/server.properties

hadoop103
Spark Streaming实时流处理项目实战(四)分布式消息队列Kafka
hadoop104
Spark Streaming实时流处理项目实战(四)分布式消息队列Kafka
hadoop105
Spark Streaming实时流处理项目实战(四)分布式消息队列Kafka
9)关闭集群命令

bin/kafka-server-stop.sh stop

10)Kafka的使用

  • 创建topic
kafka-topics.sh--create--zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181--replication-factor 1--partitions 1--topic first#参数说明--zookeeper :zookeeper地址--replication-factor :用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。--partitions  :用来设置分区数
  • 查看topic列表
kafka-topics.sh--list--zookeeper hadoop101:2181

Spark Streaming实时流处理项目实战(四)分布式消息队列Kafka

  • 生产者生产数据
kafka-console-producer.sh--broker-list hadoop103:9092--topic first#first是topic的名字#broker指的是kafka的服务端,可以是一个服务器也可以是一个集群。producer和consumer都相当于这个服务端的客户端。#broker-list指定集群中的一个或者多个服务器,一般我们再使用console producer的时候,这个参数是必备参数
  • 消费者消费数据
kafka-console-consumer.sh--bootstrap-server hadoop103:9092--topic first# 通过以上命令,可以看到消费者可以接收生产者发送的消息# bootstrap-servers指的是目标集群的服务器地址,新版本之后(新版本指的是kafka 0.8.0之后的版本)开始使用 --bootstrap-server代替 --zookeeper# 这个和broker-list功能是一样的,只不过我们在console producer要求用后者。# 如果需要从头开始接收数据,需要添加--from-beginning参数
kafka-console-consumer.sh--bootstrap-server hadoop103:9092--from-begin#ning --topic first

Spark Streaming实时流处理项目实战(四)分布式消息队列Kafka

如上图左边是生产者生产了三条数据,右边是消费者消费情况,可以看到三条数据都可以接收到

  • 消费者组
[root@master bin]# ./kafka-console-consumer.sh --bootstrap-server master:9092 --topic mytest --consumer-property group.id=group_mytes

2. 单节点多broker模式部署

参考官网:中文文档
Spark Streaming实时流处理项目实战(四)分布式消息队列Kafka

kafka容错性测试

创建一个有三个副本一个分区的topic

[root@hadoop103 ~]# kafka-topics.sh --create --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --replication-factor 3 --partitions 1 --topic mytest
Created topic"mytest".

查看该topic的详细信息,使用describe命令

[root@hadoop103 ~]# kafka-topics.sh --describe --zookeeper hadoop101:2181 --topic mytest
Topic:mytest	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: mytest	Partition: 0	Leader: 0	Replicas: 0,2,1	Isr: 0,2,1

Spark Streaming实时流处理项目实战(四)分布式消息队列Kafka

leader 是在给出的所有partitons中负责读写的节点,每个节点都有可能成为leader
replicas 显示给定partiton所有副本所存储节点的节点列表,不管该节点是否是leader或者是否存活。
ISR 副本都已同步的节点集合,这个集合中的所有节点都是存活状态,并且跟leader同步;如果有节点挂掉,Kafka会自动收缩 ISR 集合,将该副本“踢出”ISR

创建生产者和消费者并消费数据

#生产者
kafka-console-producer.sh--broker-list hadoop103:9092,hadoop104:9092,hadoop105:9092--topic mytest#消费者
kafka-console-consumer.sh--bootstrap-server hadoop105:9092,hadoop103:9092,hadoop104:9092--topic mytest

Spark Streaming实时流处理项目实战(四)分布式消息队列Kafka

  • kill其中的一个broker,先kill非leader节点

Spark Streaming实时流处理项目实战(四)分布式消息队列Kafka

我这里kill的是broker1节点,然后再次生产数据,查看消费者是否可以接收到数据,并查看isr的变化从,下图可以看到broker1节点已经被踢出了,但这时候不影响消费者消费数据

Spark Streaming实时流处理项目实战(四)分布式消息队列Kafka

  • kill Leader节点,然后再次进行消费

Spark Streaming实时流处理项目实战(四)分布式消息队列Kafka

可以看到消费者依然可以消费到数据,说明有三个副本的时候,可以允许两个节点挂掉(上图中WANG是警告不是报错,是正常现象)

Kafka副本机制
一、什么是副本机制:
通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝
二、副本机制的好处:
1、提供数据冗余
系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性
2、提供高伸缩性
支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高读操作吞吐量
3、改善数据局部性
允许将数据放入与用户地理位置相近的地方,从而降低系统延时。
参考:kafka副本机制

  • 作者:怒上王者
  • 原文链接:https://blog.csdn.net/weixin_36815898/article/details/113943006
    更新时间:2022年7月4日13:17:48 ,共 5365 字。