参考《kafka权威指南》
安装Kafka 准备:liunx、java、zookeeper
新版的kafak自带zookeeper,如果只是demo演示,可以直接安装kafka即可。
Step1: 下载kafka,并解压到路径/usr/local/kafka
Step2: 新建目录/tmp/kafka-logs
为什么创建此文件,因为config/server.propoties中有配置
1 2 3 4 log.dirs =/tmp/kafka-logs
Step3: 配置环境变量
(~/.bash_profile, ~/.bashrc, /etc/profile)任选一个文件即可
1 2 3 # kafka KAFKA_HOME=/usr/local/kafka PATH=$KAFKA_HOME/bin:$PATH
Step4: 启动kafka
注意启动kafka之前,必须启动zookeeper; zookeeper占用2181端口,kafka占用9092端口
因为`config/server.propoties中有配置
1 2 3 4 5 6 7 8 9 zookeeper.connect =localhost:2181
kafka-server-start.sh -daemon config/server.properties &
Step5: 验证
1.创建主题并查看详情
1 2 3 4 5 [johuer@pd11 kafka]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo8 Created topic "demo8" . [johuer@pd11 kafka]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo8 Topic:demo8 PartitionCount:1 ReplicationFactor:1 Configs: Topic: demo8 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
2.往主题上发布消息
1 2 3 [johuer@pd11 kafka]$ kafka-console-producer.sh --broker-list localhost:9092 --topic demo8 >hello >everyone
3.从主题上读消息
1 2 3 [johuer@pd11 kafkakafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo8 --from-beginning hello everyone
JAVA示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class KafkaDemoProducer { public static void main (String[] args) { produce(); consume(); } static void produce () { Properties kafkaProps = new Properties (); kafkaProps.put("bootstrap.servers" , "10.211.55.11:9092" ); kafkaProps.put("key.serializer" , StringSerializer.class.getName()); kafkaProps.put("value.serializer" , StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer <>(kafkaProps); ProducerRecord<String, String> record = new ProducerRecord <>("demo9" , "good" , "java-kafka" ); try { producer.send(record).get(); } catch (Exception e) { e.printStackTrace(); } producer.close(); } static void consume () { Properties kafkaProps = new Properties (); kafkaProps.put("bootstrap.servers" , "10.211.55.11:9092" ); kafkaProps.put("group.id" , "demo-group9" ); kafkaProps.put("key.deserializer" , StringDeserializer.class.getName()); kafkaProps.put("value.deserializer" , StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer <>(kafkaProps); consumer.subscribe(Collections.singletonList("demo9" )); try { while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100 )); for (ConsumerRecord<String, String> record : records) { log.debug("topic = {}, partition = {}, offset = {}, key = {}, value = {}" , record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } }finally { consumer.close(); } } }