python推送数据到kafka的kerberos服务验证(ubuntu)

2023年1月5日08:29:55

1. 安装必要的依赖包

apt-get install krb5-kdc libkrb5-dev python3-six -y --fix-missing
pip3 install gssapi==1.6.6 kafka-python==2.0.1 -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com

2. 获取必要的配置文件krb5.conf、kafka.keytab、jaas.conf放到/etc目录下

3. 配置/etc/hosts地址映射到kafka数据节点

echo "127.0.0.1 kafka-point-01\n127.0.0.1 kafka-point-02\n127.0.0.1 kafka-point-03" >> /etc/hosts

4. 生成kerberos认证票据

kinit -kt /etc/kafka.keytab kafka/bigdata-test-01@TDH

5. 初始化kerberos环境变量

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf"

6. 代码示例

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import os
import time
import json
from kafka import KafkaProducer


def get_producer():
    # 配置地址映射,127.0.0.1为示例
    with open('/etc/hosts', 'r') as hosts:
        if 'bigdata' not in hosts.read():
            os.system('echo "127.0.0.1 kafka-point-01\n127.0.0.1 kafka-point-02\n127.0.0.1 kafka-point-03" >> /etc/hosts')

    # kafka鉴权文件软链接到/etc目录
    cur_dir = os.path.dirname(os.path.abspath(__file__))  # 自定义该目录
    os.system('ln -fs %s/krb5.conf /etc/krb5.conf' % cur_dir)
    os.system('ln -fs %s/kafka.keytab /etc/kafka.keytab' % cur_dir)
    os.system('ln -fs %s/jaas.conf /etc/jaas.conf' % cur_dir)

    # 生成kafka认证密钥,配置系统环境变量
    os.system('kinit -kt /etc/kafka.keytab kafka/bigdata-test-01@TDH')
    os.environ['KAFKA_OPTS'] = '-Djava.security.auth.login.config=/etc/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf'

    producer = KafkaProducer(**{
        'bootstrap_servers': ['127.0.0.1:9092', '127.0.0.1:9092', '127.0.0.1:9092'],
        'security_protocol': 'SASL_PLAINTEXT',  # 安全协议
        'sasl_mechanism': 'GSSAPI',  # SASL机制
        'compression_type': 'gzip',  # 压缩方式,可选配置
        'api_version': (0, 10, 2),  # API版本
        'max_block_ms': 3000,  # 发送请求最大阻塞时间
        'value_serializer': lambda value: json.dumps(value).encode(),  # 数据序列化方法
        'sasl_kerberos_service_name': 'kafka'  # kerberos服务名称
    })

    # 等待0.5秒后检测是否连接成功了
    time.sleep(0.5)
    if not producer.bootstrap_connected():
        raise Exception('Connect kafka failed')
    return producer

# 因为底层socket的特性,多个进程或者同一进程下的多个线程无法共享一个producer,需要通过消息队列做生产者消费者模型
producer = get_producer()
print(producer.send('kafka_topic', {'test': 'test_data'}, partition=0).get(timeout=1))

7. 注意的点

pip安装的依赖包是kafka-python,不是kafka,这两个都有,不要混淆了。

因为底层socket的特性,多个进程或者同一进程下的多个线程无法共享一个producer,多线程多进程场景下可以使用redis作为消息中间件实现生产者消费者模型。

使用get_producer获取一次认证后的连接就可以长期使用,票据过期不会导致后续的推送失败。

  • 作者:旷古的寂寞
  • 原文链接:https://blog.csdn.net/kuanggudejimo/article/details/107186468
    更新时间:2023年1月5日08:29:55 ,共 2141 字。