centos7系统安装kafka

2022-09-08 13:08:32

一、准备
确保服务器上已经搭建完成JDK,zookeeper服务;
如果未搭建完成,请移步参考以下文章:


安装zookeeper:

1.准备

zookeeper官网地址:Apache ZooKeeper

下载安装方式

  • 使用wget命令行下载
    wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz
  • 采用下载安装包的方式:Index of /zookeeper/stable

由于要安装在远程服务器上,故采用第一种方式安装;

注意:要下载源码包,否则启动客户端是启动失败;后缀带-bin.tar.gz
启动报错如下:
ZooKeeper JMX enabled by default Using config: /opt/software/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... FAILED TO START
看下zookeeper日志文件具体报错信息:
错误: 找不到或无法加载主类 org.apache.zookeeper.ZooKeeperMain

2、安装与配置

创建和解压
tar -zxvf apache-zookeeper-3.5.9.tar.gz
重命名:mv apache-zookeeper-3.5.9 zookeeper

创建数据存储目录与日志目录
进入zookeeper解压缩后的目录,新建数据文件夹dataDir和日志文件夹dataLogDir
命令:mkdir dataDir和mkdir dataLogDir

conf配置文件
进入配置目录,赋值拷贝样本文件
命令:cp zoo_sample.cfg zoo.cfg

修改 zoo.cfg文件内容
1.修改数据存储文件地址,按照上面建立的目录,小编的如下/opt/software/zookeeper/dataLogDir

[root@localhost kafka-logs]# cat /opt/zk/apache-zookeeper-3.8.0-bin/conf/zoo.cfg 
# The number of milliseconds of each tick
tickTime=2000   #服务心跳时间
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10    #投票选举新leader的初始化时间
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5     #leader与follower心跳检测最大容忍时间
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/opt/zk/apache-zookeeper-3.8.0-bin/data    #数据目录
dataLogDir=/opt/zk/apache-zookeeper-3.8.0-bin/log  #日志目录
pidfile=/var/run/zookeeper.pid
# the port at which the clients will connect
clientPort=2181                                    #zk端口
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
  • 配置环境变量

  1. 打开/etc/profile 文件:vim /etc/profile
  2. 新增以下配置:
#ZOOKEEPER_HOME
export ZOOKEEPER_HOME=/opt/software/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH

3.重新加载配置:source /etc/profile

3、运行

  • 启动命令:./zkServer.sh start
  • 停止命令:./zkServer.sh stop
  • 查看运行状态:./zkServer.sh status

kafka:

kafka官网下载:Apache Download Mirrors

Index of /dist/kafka/2.2.2

二、安装与配置

  • 解压缩tar -zxvf kafka_2.11-2.2.0.tgz

  • 重命名mv kafka_2.11-2.2.0 kafka

[root@localhost kafa]# ls -al
总用量 164020
drwxr-xr-x. 4 root root        95 8月  16 08:26 .
drwxr-xr-x. 8 root root        82 5月  18 16:09 ..
drwxr-xr-x. 8 root root       119 8月  16 08:33 kafka
-rw-r--r--. 1 root root  63999924 8月  15 17:22 kafka_2.11-2.2.0.tgz
-rw-r--r--. 1 root root 103956099 8月  15 16:03 kafka_2.13-3.2.1.tgz
  • 创建logs文件,用于存储kafka日志:
    在kafka安装目录下创建:mkdir kafka-logs
    /opt/kafka/kafka-logs

  • 修改server.properties配置文件

    [root@localhost kafka]# cat config/server.properties  | grep -v "#" | grep -v "^$"
    broker.id=1
    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://192.168.1.36:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/opt/kafa/kafka/kafka-logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=localhost:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0

    解释:

    #一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
    broker.id=1
     
    #listeners=PLAINTEXT://:9092
     
    #advertised.listeners=PLAINTEXT://your.host.name:9092
     
    #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SS
     
    # broker 处理消息的最大线程数,一般情况下不需要去修改
    num.network.threads=3
     
    # broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
    num.io.threads=8
     
    #  socket的发送缓冲区(SO_SNDBUF)
    socket.send.buffer.bytes=102400
     
    # socket的接收缓冲区 (SO_RCVBUF)
    socket.receive.buffer.bytes=102400
     
    # socket请求的最大字节数。为了防止内存溢出,message.max.bytes必然要小于
    socket.request.max.bytes=104857600
     
    #kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
    log.dirs=/opt/software/kafka/kafka-logs 
    # 每个topic的分区个数,更多的partition会产生更多的segment file
    num.partitions=1
     
    num.recovery.threads.per.data.dir=1
     
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
     
    # 当达到下面的消息数量时,会将数据flush到日志文件中。默认10000
    #log.flush.interval.messages=10000
     
    # 当达到下面的时间(ms)时,执行一次强制的flush操作。interval.ms和interval.messages无论哪个达到,都会flush。默认3000ms
    #log.flush.interval.ms=1000
     
    # 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。
    log.retention.hours=168
     
    #log.retention.bytes=1073741824
     
    # 控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)
    log.segment.bytes=1073741824
     
    # 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(log.retention.hours或log.retention.bytes)
    log.retention.check.interval.ms=300000
     
    # Zookeeper quorum设置。如果有多个使用逗号分割 例如 ip:prot,ip:prot,ip:prot
    zookeeper.connect=localhost:2181
     
    # 连接zk的超时时间
    zookeeper.connection.timeout.ms=6000
     
    # ZooKeeper集群中leader和follower之间的同步实际
    group.initial.rebalance.delay.ms=0

四、运行


1.启动zookeeper服务


由于小编服务器上已经启动过zookeeper服务,故不需要重新执行启动命令;
如果服务器zookeeper服务未启动,则在kafka目录下执行以下命令:
使用安装包中的脚本启动单节点Zookeeper 实例:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

注意:以下命令,小编均选择在kafka安装根目录下执行;

2.启动kafka服务


以下方式任选其一

kafka目录启动
使用kafka-server-start.sh 启动kafka 服务:

bin/kafka-server-start.sh config/server.properties


这种命令执行并不是后台进程运行,故使用以下命令

bin/kafka-server-start.sh -daemon config/server.properties


命令需要切换到kafka的bin目录下,

./kafka-server-start.sh -daemon /opt/software/kafka//config/server.properties

3.创建topic。只有一个broker,所以这里一个topic,5个分区,1个副本(不能大于broker数量)

[root@localhost kafka]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic wubo
Created topic wubo.

4 查看描述 Topic 信息

[root@localhost kafka]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic wubo
Topic:wubo	PartitionCount:5	ReplicationFactor:1	Configs:
	Topic: wubo	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: wubo	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
	Topic: wubo	Partition: 2	Leader: 1	Replicas: 1	Isr: 1
	Topic: wubo	Partition: 3	Leader: 1	Replicas: 1	Isr: 1
	Topic: wubo	Partition: 4	Leader: 1	Replicas: 1	Isr: 1

说明:
第一行给出了所有分区的摘要,每个附加行给出了关于一个分区的信息。 由于我们只有一个分区,所以只有一行。
Leader: 是负责给定分区的所有读取和写入的节点。 每个节点将成为分区随机选择部分的领导者。
Replicas: 是复制此分区日志的节点列表,无论它们是否是领导者,或者即使他们当前处于活动状态。
Isr: 是一组“同步”副本。这是复制品列表的子集,当前活着并被引导到领导者。

5.查看topic

[root@localhost kafka]# kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
jettech
test
wubo

6 .产生消息:kafka-console-producer.sh

[root@localhost kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic wubo
>"jettech hello world"
>^C[root@localhost kafka]#

使用Ctrl+C退出生成消息;

7. 消费消息:kafka-console-consumer.sh

使用kafka-console-consumer.sh 接收消息并在终端打印
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning (这里用删除线标识,并不是代表命名错误,低版本仍然可以适用!!!)

[root@localhost kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wubo --from-beginning
"jettech hello world"

8.删除topic

kafka-topics.sh --delete --zookeeper localhost:2181 --topic wubo
  • 作者:Michaelwubo
  • 原文链接:https://blog.csdn.net/Michaelwubo/article/details/126358950
    更新时间:2022-09-08 13:08:32