kerberos kafka 问题解决

2022年12月29日11:26:26

修改 kafka界面 security.inter.broker.protocol SASL_PLAINTEXT
kerberos kafka 问题解决

登录添加用户生成keytab

kadmin.local
add_principal flink/master@AQ.COM
add_principal flink/worker-1@AQ.COM
add_principal flink/worker-2@AQ.COM
生成文件
xst -k /usr/local/keytab/flink/flink-master.keytab flink/master@AQ.COM
xst -k /usr/local/keytab/flink/flink-worker-1.keytab flink/worker-1@AQ.COM
xst -k /usr/local/keytab/flink/flink-worker-2.keytab flink/worker-2@AQ.COM
或者
kadmin.local -q "xst -k /usr/local/keytab/flink/flink-master.keytab flink/master@AQ.COM"

keytab合并

ktutil
rkt /usr/local/keytab/flink/flink-master.keytab
rkt /usr/local/keytab/flink/flink-worker-1.keytab
rkt /usr/local/keytab/flink/flink-worker-2.keytab
wkt /usr/local/keytab/flink/flink.keytab

查看keytab

klist -ket user.keytab

kinit登录与kadmin.local不能同时存在

kinit -kt /usr/local/keytab/flink/flink-master.keytab flink/master@AQ.COM

删除认证缓存

kdestroy

kafka生产消费

1.客户端操作kafka生产消费需要jaas.conf以及各自的生产消费的配置文件
2.代码需要设置两个变量

jaas.conf

KafkaClient{
 com.sun.security.auth.module.Krb5LoginModule required
 keyTab="/usr/local/keytab/flink/flink.keytab"
 principal="flink/master@AQ.COM"
 useKeyTab=true
 useTicketCache=true;
};
Client{
 com.sun.security.auth.module.Krb5LoginModule required
 keyTab="/usr/local/keytab/flink/flink.keytab"
 principal="flink/master@AQ.COM"
 useKeyTab=true
 useTicketCache=true;
};
KafkaServer{
 com.sun.security.auth.module.Krb5LoginModule required
 keyTab="/usr/local/keytab/flink/flink.keytab"
 principal="flink/master@AQ.COM"
 useKeyTab=true
 useTicketCache=true;
};

producer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka

consumer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
group.id=testClient

客户端消费命令

export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/keytab/kafka/jaas.conf"
kafka-console-consumer --bootstrap-server master:9092 --topic tes  --consumer.config /usr/local/keytab/kafka/consumer.properties
kafka-console-producer --broker-list master:9092 --topic tes --producer.config /usr/local/keytab/kafka/producer.properties

springboot-kafka配置问题
yml配置

  kafka:
    bootstrap-servers: master:9092,worker-2:9092
    producer:
      #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
      #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
      #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
      #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
      #可以设置的值为:all, -1, 0, 1
      acks: 1
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #=============== provider  =======================
      # 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
      # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
      retries: 0
    consumer:
      group-id: mysqltestUserBehaviorGroup
      enable-auto-commit: true
      auto-offset-reset: latest
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    jaas:
      enabled: true
    properties:
      security.protocol: SASL_PLAINTEXT
      sasl.kerberos.service.name: kafka

java flink 消费时候getKerberosJaas就好


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication(scanBasePackages = "org.fuwushe")
public class KafkaApplication {
    public static void main(String []args){
        getKerberosJaas();
        SpringApplication.run(KafkaApplication.class,args);
    }

    public static void getKerberosJaas(){
        System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
        //加载本地jass.conf文件
        System.setProperty("java.security.auth.login.config", "/usr/local/keytab/kafka/jaas.conf");

        //System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
        //System.setProperty("sun.security.krb5.debug","true");
        //System.setProperty("java.security.auth.login.config", "/load/data/flink.keytab");

    }

}


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.fuwushe.da.kafka.ConsumerListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;


@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;


    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {

        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }


    public Map<String, Object> consumerConfigs() {

        Map<String, Object> propsMap = new HashMap<>();
        try {
            String addr = InetAddress.getLocalHost().getHostAddress().replaceAll("\\.", "");//获得本机IP
            propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + addr);
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }

//        propsMap.put("security.protocol", "SASL_PLAINTEXT");
//        propsMap.put("sasl.kerberos.service.name", "kafka");
//        propsMap.put("java.security.auth.login.config","/load/data/jaas.conf");
//        propsMap.put("java.security.krb5.conf","/load/data/krb5.conf");
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }

    /**
     * kafka监听
     * @return
     */
    @Bean
    public ConsumerListener listener() {

        return new ConsumerListener();
    }

}

  • 作者:dian张
  • 原文链接:https://blog.csdn.net/u013086392/article/details/103011319
    更新时间:2022年12月29日11:26:26 ,共 6484 字。