1. 安装必要的依赖包
apt-get install krb5-kdc libkrb5-dev python3-six -y --fix-missing
pip3 install gssapi==1.6.6 kafka-python==2.0.1 -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
2. 获取必要的配置文件krb5.conf、kafka.keytab、jaas.conf放到/etc目录下
3. 配置/etc/hosts地址映射到kafka数据节点
echo "127.0.0.1 kafka-point-01\n127.0.0.1 kafka-point-02\n127.0.0.1 kafka-point-03" >> /etc/hosts
4. 生成kerberos认证票据
kinit -kt /etc/kafka.keytab kafka/bigdata-test-01@TDH
5. 初始化kerberos环境变量
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf"
6. 代码示例
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import os
import time
import json
from kafka import KafkaProducer
def get_producer():
# 配置地址映射,127.0.0.1为示例
with open('/etc/hosts', 'r') as hosts:
if 'bigdata' not in hosts.read():
os.system('echo "127.0.0.1 kafka-point-01\n127.0.0.1 kafka-point-02\n127.0.0.1 kafka-point-03" >> /etc/hosts')
# kafka鉴权文件软链接到/etc目录
cur_dir = os.path.dirname(os.path.abspath(__file__)) # 自定义该目录
os.system('ln -fs %s/krb5.conf /etc/krb5.conf' % cur_dir)
os.system('ln -fs %s/kafka.keytab /etc/kafka.keytab' % cur_dir)
os.system('ln -fs %s/jaas.conf /etc/jaas.conf' % cur_dir)
# 生成kafka认证密钥,配置系统环境变量
os.system('kinit -kt /etc/kafka.keytab kafka/bigdata-test-01@TDH')
os.environ['KAFKA_OPTS'] = '-Djava.security.auth.login.config=/etc/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf'
producer = KafkaProducer(**{
'bootstrap_servers': ['127.0.0.1:9092', '127.0.0.1:9092', '127.0.0.1:9092'],
'security_protocol': 'SASL_PLAINTEXT', # 安全协议
'sasl_mechanism': 'GSSAPI', # SASL机制
'compression_type': 'gzip', # 压缩方式,可选配置
'api_version': (0, 10, 2), # API版本
'max_block_ms': 3000, # 发送请求最大阻塞时间
'value_serializer': lambda value: json.dumps(value).encode(), # 数据序列化方法
'sasl_kerberos_service_name': 'kafka' # kerberos服务名称
})
# 等待0.5秒后检测是否连接成功了
time.sleep(0.5)
if not producer.bootstrap_connected():
raise Exception('Connect kafka failed')
return producer
# 因为底层socket的特性,多个进程或者同一进程下的多个线程无法共享一个producer,需要通过消息队列做生产者消费者模型
producer = get_producer()
print(producer.send('kafka_topic', {'test': 'test_data'}, partition=0).get(timeout=1))
7. 注意的点
pip安装的依赖包是kafka-python,不是kafka,这两个都有,不要混淆了。
因为底层socket的特性,多个进程或者同一进程下的多个线程无法共享一个producer,多线程多进程场景下可以使用redis作为消息中间件实现生产者消费者模型。
使用get_producer获取一次认证后的连接就可以长期使用,票据过期不会导致后续的推送失败。