Kafka 的安装及启动

2022年9月12日10:14:56

Kafka 安装
获取下载地址,下载后解压。

$ wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
$ tar -zxvf kafka_2.12-2.5.0.tgz && cd kafka_2.12-2.5.0

具体参考官网 Kafka - Quickstart。

启动 Kafka
Kafka 依赖 ZooKeeper,所以需要先运行后者。Kafak 安装目录下自带了 ZooKeeper,可直接运行无须单独安装。

$ bin/zookeeper-server-start.sh config/zookeeper.properties

运行 Zookeeper 时发现机器上没有 Java,报错信息如下:

$ bin/zookeeper-server-start.sh config/zookeeper.properties
/root/dev/kafka/bin/kafka-run-class.sh: line 315: exec: java: not found

所以需要先安装 Java。

安装 Java 过程中如果 yum 报 https 错误,提示 404 repo 地址无效等,

yum install java 时的报错信息
$ yum install java-1.6.0-openjdk
Failed to set locale, defaulting to C
Loaded plugins: fastestmirror
base | 3.6 kB 00:00:00
epel | 4.7 kB 00:00:00
extras | 3.4 kB 00:00:00
https://repo.mongodb.org/yum/redhat/2.2/mongodb-org/4.2/x86_64/repodata/repomd.xml: [Errno 14] HTTPS Error 404 - Not Found
Trying other mirror.
To address this issue please refer to the below knowledge base article

https://access.redhat.com/articles/1320623

If above article doesn’t help to resolve this issue please create a bug on https://bugs.centos.org/

One of the configured repositories failed (MongoDB Repository),
and yum doesn’t have enough cached data to continue. At this point the only
safe thing yum can do is fail. There are a few ways to work “fix” this:

 1. Contact the upstream for the repository and get them to fix the problem.

 2. Reconfigure the baseurl/etc. for the repository, to point to a working
    upstream. This is most often useful if you are using a newer
    distribution release than is supported by the repository (and the
    packages for the previous distribution release still work).

 3. Disable the repository, so yum won't use it by default. Yum will then
    just ignore the repository until you permanently enable it again or use
    --enablerepo for temporary usage:

        yum-config-manager --disable mongodb-org-4.2

 4. Configure the failing repository to be skipped, if it is unavailable.
    Note that yum will try to contact the repo. when it runs most commands,
    so will have to try and fail each time (and thus. yum will be be much
    slower). If it is a very temporary problem though, this is often a nice
    compromise:

        yum-config-manager --save --setopt=mongodb-org-4.2.skip_if_unavailable=true

failure: repodata/repomd.xml from mongodb-org-4.2: [Errno 256] No more mirrors to try.
https://repo.mongodb.org/yum/redhat/2.2/mongodb-org/4.2/x86_64/repodata/repomd.xml: [Errno 14] HTTPS Error 404 - Not Found

根据提示,执行 yum-config-manager --disable mongodb-org-4.2 后再次安装就成功了。

检查 Java 的安装:

$ java -version
java version “1.6.0_38”
OpenJDK Runtime Environment (IcedTea6 1.13.10) (rhel-1.13.10.0.el7_2-x86_64)
OpenJDK 64-Bit Server VM (build 23.25-b01, mixed mode)

另,如果需要卸载,执行 yum remove java-1.6.0-openjdk

再次运行 bin/zookeeper-server-start.sh config/zookeeper.properties 发现前面安装的 Java 版本不对,低了…

$ bin/zookeeper-server-start.sh config/zookeeper.properties
Exception in thread “main” java.lang.UnsupportedClassVersionError: org/apache/zookeeper/server/quorum/QuorumPeerMain : Unsupported major.minor version 52.0
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
at java.net.URLClassLoader

1.

r

u

n

(

U

R

L

C

l

a

s

s

L

o

a

d

e

r

.

j

a

v

a

:

212

)

a

t

j

a

v

a

.

s

e

c

u

r

i

t

y

.

A

c

c

e

s

s

C

o

n

t

r

o

l

l

e

r

.

d

o

P

r

i

v

i

l

e

g

e

d

(

N

a

t

i

v

e

M

e

t

h

o

d

)

a

t

j

a

v

a

.

n

e

t

.

U

R

L

C

l

a

s

s

L

o

a

d

e

r

.

f

i

n

d

C

l

a

s

s

(

U

R

L

C

l

a

s

s

L

o

a

d

e

r

.

j

a

v

a

:

205

)

a

t

j

a

v

a

.

l

a

n

g

.

C

l

a

s

s

L

o

a

d

e

r

.

l

o

a

d

C

l

a

s

s

(

C

l

a

s

s

L

o

a

d

e

r

.

j

a

v

a

:

323

)

a

t

s

u

n

.

m

i

s

c

.

L

a

u

n

c

h

e

r

1.run(URLClassLoader.java:212) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:323) at sun.misc.Launcher1.run(URLClassLoader.java:212)atjava.security.AccessController.doPrivileged(NativeMethod)atjava.net.URLClassLoader.findClass(URLClassLoader.java:205)atjava.lang.ClassLoader.loadClass(ClassLoader.java:323)atsun.misc.LauncherAppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
Could not find the main class: org.apache.zookeeper.server.quorum.QuorumPeerMain. Program will exit.

根据提示最小应该为 52 的版本,根据 Unsupported major.minor version 52.0 [duplicate]可知 52 对应的语义化版本。

同时,可通过 yum list available java* 命令查询到 yum 上可安装的版本,找一个满足要求的安装即可,yum install java-1.8.0-openjdk。

再次检查安装:

$ java -version
openjdk version “1.8.0_71”
OpenJDK Runtime Environment (build 1.8.0_71-b15)
OpenJDK 64-Bit Server VM (build 25.71-b15, mixed mode)

启动 Kafka 服务
$ bin/kafka-server-start.sh config/server.properties

创建 Topic
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

创建生产者
创建生产者发送消息

$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

创建消费者
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

此时发消息的地方有新增数据时,消费者处会实时获取到。

mac 本机
修改 Kafka 配置
$ vi /usr/local/etc/kafka/server.properties

将 listeners=PLAINTEXT://:9092 取消其注释并修改成如下形式后保存:

The address the socket server listens on. It will get the value returned from

java.net.InetAddress.getCanonicalHostName() if not configured.

FORMAT:

listeners = listener_name://host_name:port

EXAMPLE:

listeners = PLAINTEXT://your.host.name:9092

  • listeners=PLAINTEXT://:9092
  • listeners=PLAINTEXT://localhost:9092

Hostname and port the broker will advertise to producers and consumers. If not set,

it uses the value for “listeners” if configured. Otherwise, it will use the value

returned from java.net.InetAddress.getCanonicalHostName().

#advertised.listeners=PLAINTEXT://your.host.name:9092

启动 zookeeper
$ brew services start zookeeper
==> Successfully startedzookeeper (label: homebrew.mxcl.zookeeper)

注: 通过 brew 和 zookeeper 自己的命令启动时,停止服务也需要对应的命令,即,brew services stop zookeeper 只会停止 brew 启动的服务。

启动 kafka
$ brew services start kafka
==> Successfully startedkafka (label: homebrew.mxcl.kafka)

上面是以服务形式常驻启动,如果临时启动,可使用下面的命令:

$ zkServer start
$ kafka-server-start /usr/local/etc/kafka/server.properties

创建 Topic
$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

如果出现如下错误:

Exception in thread “main” kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
at kafka.zookeeper.ZooKeeperClient.

a

n

o

n

f

u

n

anonfunanonfunwaitUntilConnected

3

(

Z

o

o

K

e

e

p

e

r

C

l

i

e

n

t

.

s

c

a

l

a

:

262

)

a

t

k

a

f

k

a

.

z

o

o

k

e

e

p

e

r

.

Z

o

o

K

e

e

p

e

r

C

l

i

e

n

t

.

w

a

i

t

U

n

t

i

l

C

o

n

n

e

c

t

e

d

(

Z

o

o

K

e

e

p

e

r

C

l

i

e

n

t

.

s

c

a

l

a

:

258

)

a

t

k

a

f

k

a

.

z

o

o

k

e

e

p

e

r

.

Z

o

o

K

e

e

p

e

r

C

l

i

e

n

t

.

<

i

n

i

t

>

(

Z

o

o

K

e

e

p

e

r

C

l

i

e

n

t

.

s

c

a

l

a

:

119

)

a

t

k

a

f

k

a

.

z

k

.

K

a

f

k

a

Z

k

C

l

i

e

n

t

3(ZooKeeperClient.scala:262) at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:258) at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:119) at kafka.zk.KafkaZkClient3(ZooKeeperClient.scala:262)atkafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:258)atkafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:119)atkafka.zk.KafkaZkClient.apply(KafkaZkClient.scala:1863)
at kafka.admin.TopicCommand

Z

o

o

k

e

e

p

e

r

T

o

p

i

c

S

e

r

v

i

c

e

ZookeeperTopicServiceZookeeperTopicService.apply(TopicCommand.scala:341)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:55)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
USB Microphone https://www.soft-voice.com/
Wooden Speakers https://www.zeshuiplatform.com/
亚马逊测评 www.yisuping.cn
深圳网站建设www.sz886.com

  • 作者:福伴
  • 原文链接:https://blog.csdn.net/weixin_45032957/article/details/117462072
    更新时间:2022年9月12日10:14:56 ,共 6791 字。