引言

ZooKeeper 和 Kafka 是常用的分布式系统组件,ZooKeeper 提供分布式协调服务,Kafka 则是一个高吞吐量的分布式消息队列系统。在生产环境中,为了保障数据安全,通常需要配置 SASL 认证机制。本文将详细介绍如何在单节点环境下部署 ZooKeeper 和 Kafka,并配置 SASL 认证,确保通信安全。

SASL 认证

简单认证和安全层 (Simple Authentication and Security Layer,SASL) 是一种用于在网络协议中提供认证和数据安全性的框架。它不定义具体的认证机制,而是提供了一个通用的框架,允许不同的认证机制 (如 Kerberos, GSSAPI, PLAIN 等) 插入到应用程序协议中,例如 LDAP, SMTP, IMAP 等。

SASL 的主要优点在于它的 灵活性和可扩展性 。通过将认证机制与应用程序协议分离,SASL 允许开发者根据需要选择和使用不同的认证机制,而无需修改应用程序协议本身。这使得应用程序能够支持多种认证方式,并更容易适应新的安全需求。

环境说明

操作系统: Debian 12
Java : JDK 17
ZooKeeper: 3.8.4
Kafka: 2.13-3.8.0

软件安装目录: /root/software

安装 JDK

如果存在 JDK 就跳过这一步

  1. 下载安装 jdk

    1
    2
    3
    4
    cd /root/software
    wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
    tar -zvxf jdk-17_linux-x64_bin.tar.gz
    mv jdk-17.0.12 jdk17
  2. 配置环境变量

    1
    2
    3
    4
    5
    vim /etc/profile

    # 添加以下内容
    export JAVA_HOME=/root/software/jdk17
    export PATH=$JAVA_HOME/bin:$PATH
  3. 检查 JDK

    1
    2
    3
    4
    5
    6
    source /etc/profile

    $ java -version
    java version "17.0.12" 2024-07-16 LTS
    Java(TM) SE Runtime Environment (build 17.0.12+8-LTS-286)
    Java HotSpot(TM) 64-Bit Server VM (build 17.0.12+8-LTS-286, mixed mode, sharing)

Zookeeper 单机模式

安装 Zookeeper

  1. 下载安装 Zookeeper

    1
    2
    3
    4
    5
    wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
    tar -zvxf apache-zookeeper-3.8.4-bin.tar.gz
    mv zookeeper-3.8.4 zookeeper

    echo '3.8.4' > zookeeper/VERSION
  2. 配置环境变量

    1
    2
    3
    4
    5
    vim /etc/profile

    # 添加以下内容
    export ZOOKEEPER_HOME=/root/software/zookeeper
    export PATH=$ZOOKEEPER_HOME/bin:$PATH
  3. 修改配置

    1
    2
    3
    4
    cd zookeeper/config
    cp zoo_sample.cfg zoo.cfg

    vim zoo.cfg

    修改以下信息,没有则添加

    1
    2
    3
    dataDir=/root/software/zookeeper/zkData
    dataLogDir=/root/software/zookeeper/logs
    4lw.commands.whitelist=ruok,stat,conf,isro
  4. 启动 zookeeper

    1
    zkServer.sh start

    查看运行状态

    1
    zkServer.sh status

    image-20240902105903227

  5. 使用命令检测是否正常运行

    1
    2
    echo ruok | nc 127.0.0.1 2181
    imok#

SASL 认证

  1. 修改配置文件

    1
    2
    cd zookeeper/config
    vim zoo.cfg

    末尾添加以下信息

    1
    2
    3
    4
    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    jaasLoginRenew=3600000
    requireClientAuthScheme=sasl
    zookeeper.sasl.client=true
  2. 创建密码文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="admin"
    password="zk-admin-password"
    user_kafka="zk-kafka-password"
    user_ops="zk-ops-password";
    };


    Client {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="ops"
    password="zk-ops-password";
    };
  3. 修改 zkEnv.sh
    export SERVER_JVMFLAGS 末尾添加 -Djava.security.auth.login.config=/root/software/zookeeper/conf/zk_server_jaas.conf

    1
    2
    3
    4
    vim bin/zkEnv.sh

    # 修改以下内容
    export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS -Djava.security.auth.login.config=/root/software/zookeeper/conf/zk_server_jaas.conf"
  4. 重新启动 zzk

    1
    2
    zkServer.sh restart
    zkServer.sh status
  5. 验证服务

    1
    zkCli.sh -server localhost:2181

Kafka 单机模式

安装 Kafka

  1. 下载安装 Kafka

    1
    2
    3
    4
    5
    wget https://dlcdn.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz
    tar -zvxf kafka_2.13-3.8.0.tgz
    mv kafka_2.13-3.8.0 kafka

    echo 2.13-3.8.0 > kafka/VERSION
  2. 配置环境变量

    1
    2
    3
    4
    5
    vim /etc/profile

    # 添加以下内容
    export KAFKA_HOME=/root/software/kafka
    export PATH=$KAFKA_HOME/bin:$PATH
  3. zookeeper 配置

    1
    2
    3
    4
    5
    6
    7
    8
    vim config/server.properties

    # 日志目录
    log.dirs=/root/software/kafka/kafka-logs


    # zookeeper 连接配置
    zookeeper.connect=localhost:2181
  4. 启动 kafka

    1
    kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
  5. 查看进程

    1
    2
    3
    4
    $ jps  
    3693896 Kafka
    1574625 QuorumPeerMain
    1576837 Jps

SASL 认证

  1. 修改配置文件

vim config/server.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
listeners=SASL_PLAINTEXT://:9092

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=SASL_PLAINTEXT://localhost:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=true
  1. 服务端配置 server jaas 文件
    Kafka 的用户认证基于 Java 的 JAAS 框架。因此,我们需要先添加 JAAS 服务端的配置文件。
    config/kafka_server_jaas.conf 中添加以下配置信息

    1
    2
    cd $KAFKA_HOME/config
    vim kafka_server_jaas.conf
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="your admin password"
    user_admin="your admin password"
    user_producer="your password"
    user_consumer="your password";
    };

    Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="kafka"
    password="zk-kafka-passwrd";
    };

    容易引起疑惑的是 user_admin 和 user_producer 这两个属性。它们用于定义用户名和密码.
    格式为:user_userName=password。这里定义了用户 admin 和用户 producer 的密码分别为”admin-secret” 和”producer-secret”。

  2. 客户端配置 client jaas 文件

1
2
cd $KAFKA_HOME/config
vim kafka_client_jaas.conf
1
2
3
4
5
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="producer"
password="your password";
};

这里密码要与 server jaas 密码对应

  1. 将 JAAS 配置到 kafka 服务器节点的 JVM 启动参数中,复制 kafka 的启动脚本 kafka-server-start.sh,命名 kafka-server-start-saal.sh,如下操作
1
2
cp zookeeper-server-start.sh kafka-server-start-sasl.sh
vim kafka-server-start-sasl.sh
1
2
3
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/root/software/kafka/kafka_server_jaas.conf"
fi
  1. kafka-console-consumer.shkafka-console-producer.sh 与 4 同样的操作
  2. 配置授权信息文件
1
vim config/jaas.properties
1
2
3
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="producer" password="your password";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
  1. 启动 Kafka
1
2
kafka-server-stop.sh
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
  1. 查看启动日志是否正常
1
tail -f logs/server.log

kafka 命令使用(SASL)

除了 kafka-console-consumer.shkafka-console-producer.sh 都需要附带 --command-config config/jaas.properties 参数进行认证

  • 查看 topic list

    1
    kafka-topics.sh --list --bootstrap-server localhost:9092 --command-config config/jaas.properties
  • 消费者

    1
    kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN --topic test 
  • 生产者

    1
    kafka-console-producer.sh --broker-list localhost:9092 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN --topic test
  • 查看 topic 详情

    1
    kafka-topics.sh --command-config config/jaas.properties --describe --bootstrap-server kl.do:9092 --topic test
  • 消费组列表

    1
    kafka-consumer-groups.sh --bootstrap-server kl.do:9092 --list --command-config config/jaas.properties
  • 消费组详情

    1
    kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --command-config config/jaas.properties --group minio-group 

GO 使用 Kafka Sasl 认证

安装 github.com/IBM/sarama

1
go get github.com/IBM/sarama

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
var (
brokers = []string{"domain:9092"}
)

const (
minioEventTopic = "event"
minioEventGroup = "event-group"
)

func GetKafkaConsumerGroup() sarama.ConsumerGroup {
// 设置日志
sarama.Logger = log.New(os.Stdout, "[sarama] event ", log.LstdFlags)

// 创建消费者组配置
config := sarama.NewConfig()

// 设置 SASL 认证
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = "consumer"
config.Net.SASL.Password = "your password"

// 设置消费者组
config.Version = sarama.V3_6_0_0
config.Consumer.Offsets.Initial = sarama.OffsetNewest

// 创建消费者组
consumerGroup, err := sarama.NewConsumerGroup(brokers, minioEventGroup, config)
if err != nil {
log.Fatalf("Error creating consumer eventGroup: %v", err)
}

return consumerGroup
}

type ConsumerGroupHandler struct{}

func (ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
log.Printf("获取到消息 partition:%d, offset: %d, key:%s \n", msg.Partition, msg.Offset, string(msg.Key))
sess.MarkMessage(msg, "")
}
return nil
}