300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > 利用Kafka发送/消费消息-Java示例

利用Kafka发送/消费消息-Java示例

时间:2019-05-08 10:43:59

相关推荐

利用Kafka发送/消费消息-Java示例

利用Kafka发送/消费消息-Java示例

当使用命令行工具把基本的组件运行起来后,再使用Java client就很简单,这里是入门的第一个Java客户端程序,有很多需要深入理解的地方。

依赖配置

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.8.2.1</version></dependency>

注意kafka的artifact_Id(比如我用的kafka_2.11)后面的版本号一定要和本机装的Scala版本一致,否则会报以下错误。

Exception in thread "main" java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$classat kafka.utils.Pool.<init>(Pool.scala:28)at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:91)at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:69)at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:105)at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)at com.vonzhou.learn.message.KafkaConsumer.<init>(KafkaConsumer.java:23)at com.vonzhou.learn.message.KafkaConsumer.main(KafkaConsumer.java:72)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:497)at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)Caused by: java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$classat .URLClassLoader.findClass(URLClassLoader.java:381)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)at java.lang.ClassLoader.loadClass(ClassLoader.java:357)... 13 more

Producer发送消息

public class MessageProducer {private Producer<String,String> producer;public static void main(String[] args) {new MessageProducer().start();}public void init(){Properties props = new Properties();/*** 用于自举(bootstrapping ),producer只是用它来获得元数据(topic, partition, replicas)* 实际用户发送消息的socket会根据返回的元数据来确定*/props.put("metadata.broker.list", "localhost:9092");/*** 消息的序列化类* 默认是 kafka.serializer.DefaultEncoder, 输入时 byte[] 返回是同样的字节数组*/props.put("serializer.class", "kafka.serializer.StringEncoder");/*** producer发送消息后是否等待broker的ACK,默认是0* 1 表示等待ACK,保证消息的可靠性*/props.put("request.required.acks", "1");ProducerConfig config = new ProducerConfig(props);// 泛型参数分别表示 The first is the type of the Partition key, the second the type of the messageproducer = new Producer<String, String>(config);}public void produceMsg(){// 构建发送的消息long timestamp = System.currentTimeMillis();String msg = "Msg" + timestamp;String topic = "test"; // 确保有这个topicSystem.out.println("发送消息" + msg);String key = "Msg-Key" + timestamp;/*** topic: 消息的主题* key:消息的key,同时也会作为partition的key* message:发送的消息*/KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, key, msg);producer.send(data);}public void start() {System.out.println("开始发送消息 ...");Executors.newSingleThreadExecutor().execute(new Runnable() {public void run() {init();while (true) {try {produceMsg();Thread.sleep(2000);} catch (Throwable e) {if (producer != null) {try {producer.close();} catch (Throwable e1) {System.out.println("Turn off Kafka producer error! " + e);}}}}}});}}

Consumer消费消息

public class MessageConsumer {private ConsumerConnector consumer;private String topic;public static void main(String[] arg) {new MessageConsumer().start();}public void init(){// 指定 zookeeper 的地址String zookeeper = "localhost:2181";String topic = "test";String groupId = "test-group";Properties props = new Properties();/*** 必须的配置*/props.put("zookeeper.connect", zookeeper);/*** 必须的配置, 代表该消费者所属的 consumer group*/props.put("group.id", groupId);/*** 多长时间没有发送心跳信息到zookeeper就会认为其挂掉了,默认是6000*/props.put("zookeeper.session.timeout.ms", "6000");/*** 可以允许zookeeper follower 比 leader慢的时长*/props.put("zookeeper.sync.time.ms", "200");/*** 控制consumer offsets提交到zookeeper的频率, 默认是60 * 1000*/props.put("mit.interval.ms", "1000");consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));this.topic = topic;}public void consume() {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, 1);/*** createMessageStreams 为每个topic创建 message stream*/Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);ConsumerIterator<byte[], byte[]> iterator = stream.iterator();while (iterator.hasNext()) {try {String message = new String(iterator.next().message());System.out.println("收到消息" + message);} catch (Throwable e) {System.out.println(e.getCause());}}}public void start() {System.out.println("开始消费消息...");Executors.newSingleThreadExecutor().execute(new Runnable() {public void run() {init();while (true) {try {consume();} catch (Throwable e) {if (consumer != null) {try {consumer.shutdown();} catch (Throwable e1) {System.out.println("Turn off Kafka consumer error! " + e);}}}}}});}}

参考

配置参数说明

Kafka 0.8.2 Documentation

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。