confluent installation

环境变量,数据目录设置

export CONFLUENT_CURRENT=/data/hadoop
export CONFLUENT_HOME=/data/apache/confluent
export PATH=$PATH:$CONFLUENT_HOME/bin

生效后用以下命令查看confluent数据目录,并生成kafka, zookeeper数据目录

confluent current
# confluent.ljzXwHuB为以上命令生成的目录
mkdir -p /data/hadoop/confluent.ljzXwHuB/zookeeper/data
mkdir -p /data/hadoop/confluent.ljzXwHuB/kafka/data
echo 1 > /data/hadoop/confluent.ljzXwHuB/zookeeper/data/myid

zookeeper配置

vim /data/apache/confluent/etc/kafka/zookeeper.properties

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5

dataDir=/data/hadoop/confluent.ljzXwHuB/zookeeper/data
# the port at which the clients will connect
clientPort=2181

# the maximum number of client connections.
# increase this if you need to handle more clients
maxClientCnxns=60

server.1=192.168.11.82:2888:3888
server.2=192.168.11.81:2888:3888
server.3=192.168.11.83:2888:3888

autopurge.snapRetainCount=3
autopurge.purgeInterval=24

kafka配置

vim /data/apache/confluent/etc/kafka/server.properties

broker.id=1

listeners=PLAINTEXT://192.168.11.82:9092

log.dirs=/data/hadoop/confluent.ljzXwHuB/kafka/data

num.partitions=12

log.retention.bytes=-1
log.cleanup.policy=delete

zookeeper.connect=192.168.11.82:2181,192.168.11.81:2181,192.168.11.83:2181

# delete topics
delete.topic.enable=true

查看zookeeper状态

利用nc命令查看

# centos
yum install nmap
# debian
apt-get install netcat
# zookeeper leader or fellower
echo stat | nc 192.168.1.20 2181 | grep Mode

schema-registry配置

修改配置文件"schema-registry.properties",以下为常用命令

# List all subjects
curl -X GET http://localhost:8081/subjects

# Deletes all schema versions registered under the subject "Kafka-value"
curl -X DELETE http://localhost:8081/subjects/Kafka-value

connect配置

修改配置文件"connect-avro-distributed.properties",以下为常用命令

# start connect
confluent start connect
# list connectors
confluent list connectors
# load hdfs sink connect (place one config file in kafka-connect-hdfs)
confluent load hdfs-sink -d etc/kafka-connect-hdfs/test-hdfs.properties
# unload hdfs sink
confluent unload hdfs-sink
# connector's status
confluent status connectors

test-hdfs.properties配置

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_hdfs
hdfs.url=hdfs://localhost:9000
flush.size=2
partitioner.class=io.confluent.connect.hdfs.partitioner.FieldPartitioner
partition.field.name=log_date,log_hour
#format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
#hive.integration=true
#hive.metastore.uris=thrift://localhost:9083
#schema.compatibility=BACKWARD
#hive.database=test

hive外部表

-- 生成的avro文件自带schema
CREATE EXTERNAL TABLE `test_hdfs`(
  `req_id` string,
  `ip` string, 
  `create_date` int
)
PARTITIONED BY (
  `log_date` int,
  `log_hour` int
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS AVRO
LOCATION '/topics/test_hdfs/'
;

-- 可采取以下两种方式添加分区,添加后才能查询
msck repair table test_hdfs;
alter table test_hdfs add partition (log_date=20180712, log_hour=1);

参考

links

social