安装环境:

系统:

CentOS release 6.8 (Final)

服务器:

120.77.32.220

kafka版本:

kafka_2.11-2.0.0.tgz

zookeeper版本:

zookeeper-3.4.13.tar.gz

1、简介

kafka (官网地址:http://kafka.apache.org)是一款分布式消息发布和订阅的系统,具有高性能和高吞吐率。


2、下载软件包

iptables -F

setenforce 0

cd /opt/software

wget

jdk下载地址

3、安装java3.1、解压jdk

cd /opt/software

tar -xzf jdk-8u65-linux-x64.tar.gz -C /usr/local/

mv /usr/local/jdk1.8.0_65/ /usr/local/java

3.2、配置环境变量

cat /etc/profile

# jdk

JAVA_HOME=/usr/local/java

JRE_HOME=/usr/local/java/jre

CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH

PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH

source /etc/profile

3.3、查看JAVA是否安装成功

java -version

[root@localhost local]# java -version

java version "1.8.0_151"

Java(TM) SE Runtime Environment (build 1.8.0_151-b12)

Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode)

4、kafka安装

tar -xzf kafka_2.11-2.0.0.tgz -C /usr/local/

mv /usr/local/kafka_2.11-2.0.0/ /usr/local/kafka

5、配置环境变量

cat /etc/profile

# kafka

export KAFKA_HOME=/usr/local/kafka

export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

6、配置kakfa

mkdir -p /usr/local/kafka/log/kafka #创建kafka日志目录

cd /usr/local/kafka/config #进入配置目录

vi server.properties #编辑修改相应的参数

broker.id=0

port=9092 #端口号

host.name=192.168.0.13 #服务器IP地址,修改为自己的服务器IP

log.dirs=/usr/local/kafka/log/kafka #日志存放路径,上面创建的目录

zookeeper.connect=192.168.0.13:2181 #zookeeper地址和端口,单机配置部署,localhost:2181

7、配置zookeeper

mkdir /usr/local/kafka/zookeeper #创建zookeeper目录

mkdir /usr/local/kafka/log/zookeeper #创建zookeeper日志目录

cd /usr/local/kafka/config #进入配置目录

vi zookeeper.properties #编辑修改相应的参数

dataDir=/usr/local/kafka/zookeeper #zookeeper数据目录

dataLogDir=/usr/local/kafka/log/zookeeper #zookeeper日志目录

clientPort=2181

maxClientCnxns=100

tickTime=2000

initLimit=10

syncLimit=5

8、创建启动、关闭kafka脚本

cd /usr/local/kafka

#创建启动脚本

cat kafkastart.sh

#!/bin/sh

#启动zookeeper

/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &

sleep 3

#启动kafka

/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &


#创建关闭脚本

cat kafkastop.sh

#!/bin/bash

#关闭kafka

/usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties &

sleep 3

#关闭zookeeper

/usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/config/zookeeper.properties &


#添加脚本执行权限

chmod +x kafkastart.sh

chmod +x kafkastop.sh

9、设置脚本开机自动执行

vi /etc/rc.d/rc.local

sh /usr/local/kafka/kafkastart.sh &

10、启动Kafka 、zookeeper服务

#启动kafka

sh /usr/local/kafka/kafkastart.sh

#关闭kafka

sh /usr/local/kafka/kafkastop.sh


netstat -nptl |grep 2181

netstat -nptl |grep 9092

查看zookeeper版本:

echo stat|nc localhost 2181

11、测试生产者和消费者

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

bin/kafka-topics.sh --list --zookeeper localhost:2181

test


bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

>This is a message

>This is another message

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

This is a message

This is another message

12、脚本定期清理logs下的日志文件

cat clean_kafkalog.sh

#!/bin/bash

#Description:This script is used to clear kafka logs, not message file.

#History: 2018-09-04 First release.

# log file dir.

logDir=/usr/local/kafka/logs

# Reserved 7 files.

COUNT=7

ls -t $logDir/server.log* | tail -n +$[$COUNT+1] | xargs rm -f

ls -t $logDir/controller.log* | tail -n +$[$COUNT+1] | xargs rm -f

ls -t $logDir/state-change.log* | tail -n +$[$COUNT+1] | xargs rm -f

ls -t $logDir/log-cleaner.log* | tail -n +$[$COUNT+1] | xargs rm -f

chmod +x clean_kafkalog.sh


每周日的0点0分去执行这个脚本:

crontab -e

0 0 * * 0 /usr/local/kafka/clean_kafkalog.sh

13、配置SASL用户名密码认证13.1、zookeeper配置13.1.1修改zookeeper.properties配置文件

vim /usr/local/kafka/config/zookeeper.properties

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

requireClientAuthScheme=sasl

jaasLoginRenew=3600000

13.1.2、创建JAAS配置文件

cat /usr/local/kafka/config/zk_server_jaas.conf

ZKServer {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="admin-secret"

user_admin="admin-secret";

};

13.1.3、启动zookeeper前设置环境变量

cat /usr/local/kafka/bin/zookeeper-server-start.sh

export KAFKA_OPTS=" -Djava.security.auth.login.config=$KAFKA_HOME/config/zk_server_jaas.conf -Dzookeeper.sasl.serverconfig=ZKServer"

13.1.4、启动zookeeper,编写启动脚本测试配置是否正确

cat /usr/local/kafka/zookeeper_start.sh

#!/bin/bash


export KAFKA_OPTS=" -Djava.security.auth.login.config=$KAFKA_HOME/config/zk_server_jaas.conf -Dzookeeper.sasl.serverconfig=ZKServer "

$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties >> /usr/local/kafka/log/zookeeper/zookeeper.log 2>&1 &


chmod +x zookeeper_start.sh

./zookeeper_start.sh

13.2、kafka配置13.2.1、伪分布式Kafka Broker配置

配置server.properties:

vim /usr/local/kafka/config/server.properties #文件最后加入

# 监听的地址,使用的schema是SASL_PLAINTEXT,即最简单的用户名密码认证,不需要加密。

listeners=SASL_PLAINTEXT://192.168.0.13:9092


# 使用的认证协议

security.inter.broker.protocol=SASL_PLAINTEXT


# SASL机制

sasl.enabled.mechanisms=PLAIN

sasl.mechanism.inter.broker.protocol=PLAIN


# 完成身份验证的类

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer


# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。

#allow.everyone.if.no.acl.found=true

super.users=User:admin;User:alice

13.2.2、创建JAAS配置文件

vim /usr/local/kafka/config/kafka_server_jaas.conf

KafkaServer {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="geting"

password="geting"

user_geting="geting"

user_alice="geting";

};


Client {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="admin-secret";

};

13.2.3、启动kafka前设置环境变量

vim /usr/local/kafka/bin/kafka-server-start.sh

export KAFKA_OPTS=" -Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_server_jaas.conf "


cat /usr/local/kafka/bin/kafka-run-class.sh


KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf'

# Launch mode

if [ "x$DAEMON_MODE" = "xtrue" ]; then

nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &

else

exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"

fi

13.1.4、启动kafka,编写启动脚本测试配置是否正确

vim /usr/local/kafka/kafka_start.sh

#!/bin/bash

export KAFKA_OPTS=" -Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_server_jaas.conf"

$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties >>/usr/local/kafka/log/kafka/kafka.log 2>&1 &


chmod +x kafka_start.sh

./kafka_start.sh


14、为你需要使用的用户授权

./kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.0.15:2181 --add --allow-principal User:alice --operation Read --operation Write --topic sean-security

查询已经授权的用户:

./kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.0.15:2181 --list --topic sean-security

15、Consumer & Provider端配置15.1、创建kafka_client_jaas.conf文件

KafkaClient {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="alice"

password="alice2";

};

15.2、让程序加载这个配置文件

System.setProperty("java.security.auth.login.config",

"$KAFKA_HOME/config/kafka_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径

props.put("security.protocol", "SASL_PLAINTEXT");

props.put("sasl.mechanism", "PLAIN");