300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > KAFKA 同步和异步消息的发送(开发实战)

KAFKA 同步和异步消息的发送(开发实战)

时间:2020-07-18 10:23:40

相关推荐

KAFKA 同步和异步消息的发送(开发实战)

文章目录

一、消费者监听1. 启动zk2. 启动kafka3. 创建主题4. 消费者监听消息二、生产者工程2.1. 依赖2.2. 生产者代码(同步)2.3. 生产者代码(异步)2.4. 发送消息2.5. 消费者监听消息2.6. 结果返回
一、消费者监听
1. 启动zk

zkServer.sh start# 监听运行状态zkServer.sh status

2. 启动kafka

# 后台启动kafkakafka-server-start.sh -daemon /app/kafka_2.12-2.8.0/config/server.properties

3. 创建主题

# 创建一个主题名称为topic_1 该主题分区1个分区 ,该分区有1个副本kafka-topics.sh --zookeeper localhost:2181/mykafka --create --topic topic_1 --partitions 1 --replication-factor 1

4. 消费者监听消息

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1

二、生产者工程
2.1. 依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.1</version></dependency>

2.2. 生产者代码(同步)

package com.gblfy.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.mon.header.Header;import org.mon.header.internals.RecordHeader;import org.mon.serialization.IntegerSerializer;import org.mon.serialization.StringSerializer;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;public class MyProducer1 {public static void main(String[] args) throws ExecutionException, InterruptedException {Map<String, Object> configs = new HashMap<String, Object>();//指定初始化连接用到的broker地址configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.102:9092");//指定key序列化类configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);//指定value序列化类configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//构造生产者对象 指定发送的key和value的类型 配置的参数列表(必填参数+辅助参数)KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);//用于设置用户自定义的消息头字段List<Header> headers = new ArrayList<>();headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));//构造record封装发送消息主体ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic_1", //指定发送主题0,//指定发送分区0,//指定发送key"hello gblfy 0",//指定发送消息主题headers//用于设置用户自定义的消息头字段);//消息的同步确认 调用send方法发送消息final Future<RecordMetadata> future = producer.send(record);//调用get方法接收消息final RecordMetadata metadata = future.get();System.out.println("消息的主题:" + metadata.topic());System.out.println("消息的分区:" + metadata.partition());System.out.println("消息的偏移量:" + metadata.offset());//关闭生产者producer.close();}}

2.3. 生产者代码(异步)

package com.gblfy.kafka.producer;import org.apache.kafka.clients.producer.*;import org.mon.header.Header;import org.mon.header.internals.RecordHeader;import org.mon.serialization.IntegerSerializer;import org.mon.serialization.StringSerializer;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;public class MyProducer1 {public static void main(String[] args) throws ExecutionException, InterruptedException {Map<String, Object> configs = new HashMap<String, Object>();//指定初始化连接用到的broker地址configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.102:9092");//指定key序列化类configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);//指定value序列化类configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//构造生产者对象 指定发送的key和value的类型 配置的参数列表(必填参数+辅助参数)KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);//用于设置用户自定义的消息头字段List<Header> headers = new ArrayList<>();headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));//构造record封装发送消息主体ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic_1", //指定发送主题0,//指定发送分区0,//指定发送key"hello gblfy 0",//指定发送消息主题headers//用于设置用户自定义的消息头字段);//消息的同步确认 调用send方法发送消息final Future<RecordMetadata> future = producer.send(record);//调用get方法接收消息// final RecordMetadata metadata = future.get();// System.out.println("消息的主题:" + metadata.topic());// System.out.println("消息的分区:" + metadata.partition());// System.out.println("消息的偏移量:" + metadata.offset());//消息的异步确认producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("消息的主题:" + metadata.topic());System.out.println("消息的分区:" + metadata.partition());System.out.println("消息的偏移量:" + metadata.offset());} else {System.out.println("异常消息");}}});//关闭生产者producer.close();}}

2.4. 发送消息

消息有同步发送和异步发送二种

2.5. 消费者监听消息
2.6. 结果返回

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。