Johuer's Blog

多学知识,精简代码

0%

kafka安装配置及JAVA示例

参考《kafka权威指南》

安装Kafka

准备:liunx、java、zookeeper

新版的kafak自带zookeeper,如果只是demo演示,可以直接安装kafka即可。

Step1: 下载kafka,并解压到路径/usr/local/kafka

logo

Step2: 新建目录/tmp/kafka-logs

为什么创建此文件,因为config/server.propoties中有配置

1
2
3
4
############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

Step3: 配置环境变量

(~/.bash_profile, ~/.bashrc, /etc/profile)任选一个文件即可

1
2
3
# kafka
KAFKA_HOME=/usr/local/kafka
PATH=$KAFKA_HOME/bin:$PATH

Step4: 启动kafka

注意启动kafka之前,必须启动zookeeper; zookeeper占用2181端口,kafka占用9092端口

因为`config/server.propoties中有配置

1
2
3
4
5
6
7
8
9

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

kafka-server-start.sh -daemon config/server.properties &

logo

Step5: 验证

1.创建主题并查看详情

1
2
3
4
5
[johuer@pd11 kafka]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo8
Created topic "demo8".
[johuer@pd11 kafka]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo8
Topic:demo8 PartitionCount:1 ReplicationFactor:1 Configs:
Topic: demo8 Partition: 0 Leader: 0 Replicas: 0 Isr: 0

2.往主题上发布消息

1
2
3
[johuer@pd11 kafka]$ kafka-console-producer.sh --broker-list localhost:9092 --topic demo8
>hello
>everyone

3.从主题上读消息

1
2
3
[johuer@pd11 kafkakafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo8 --from-beginning
hello
everyone

JAVA示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class KafkaDemoProducer {


public static void main(String[] args) {
produce();
consume();
}

static void produce() {
Properties kafkaProps = new Properties();

kafkaProps.put("bootstrap.servers", "10.211.55.11:9092");
kafkaProps.put("key.serializer", StringSerializer.class.getName());
kafkaProps.put("value.serializer", StringSerializer.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps);

ProducerRecord<String, String> record = new ProducerRecord<>("demo9", "good", "java-kafka");

try {
producer.send(record).get();

} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}

static void consume() {
// 1.创建消费者
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "10.211.55.11:9092");
kafkaProps.put("group.id", "demo-group9");
kafkaProps.put("key.deserializer", StringDeserializer.class.getName());
kafkaProps.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);

// 2.订阅主题
consumer.subscribe(Collections.singletonList("demo9"));

// 3.轮询
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.debug("topic = {}, partition = {}, offset = {}, key = {}, value = {}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
}finally {
consumer.close();
}
}

}