博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka入门样例 for java
阅读量:5030 次
发布时间:2019-06-12

本文共 3845 字,大约阅读时间需要 12 分钟。

1,生产者

import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;public class TestProducer {            public static void main(String[] args) {              Properties props = new Properties();              props.setProperty("metadata.broker.list","10.XX.XX.XX:9092");              props.setProperty("serializer.class","kafka.serializer.StringEncoder");              props.put("request.required.acks","1");              ProducerConfig config = new ProducerConfig(props);              Producer
producer = new Producer
(config); KeyedMessage
data = new KeyedMessage
("mykafka","test-kafka"); try { int i =1; while(i < 1000){ producer.send(data); } } catch (Exception e) { e.printStackTrace(); } producer.close(); } }
2。消费者

import java.util.HashMap;import java.util.List;  import java.util.Map;  import java.util.Properties;    import kafka.consumer.ConsumerConfig;  import kafka.consumer.ConsumerIterator;  import kafka.consumer.KafkaStream;  import kafka.javaapi.consumer.ConsumerConnector; public class TestConsumer extends Thread{          private final ConsumerConnector consumer;          private final String topic;            public static void main(String[] args) {              TestConsumer consumerThread = new TestConsumer("mykafka");              consumerThread.start();          }          public TestConsumer(String topic) {              consumer =kafka.consumer.Consumer                      .createJavaConsumerConnector(createConsumerConfig());              this.topic =topic;          }        private static ConsumerConfig createConsumerConfig() {          Properties props = new Properties();          props.put("zookeeper.connect","10.XX.XX.XX:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181");          props.put("group.id", "0");          props.put("zookeeper.session.timeout.ms","10000");          return new ConsumerConfig(props);      }        public void run(){          Map
topickMap = new HashMap
(); topickMap.put(topic, 1); Map
>> streamMap =consumer.createMessageStreams(topickMap); KafkaStream
stream = streamMap.get(topic).get(0); ConsumerIterator
it =stream.iterator(); System.out.println("*********Results********"); while(true){ if(it.hasNext()){ System.err.println("get data:" +new String(it.next().message())); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
3,分别启动生产者和消费者,在消费者输出中看到下日志即成功

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).log4j:WARN Please initialize the log4j system properly.*********Results********get data:test-kafka
4。启动生产者假设报错例如以下:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).log4j:WARN Please initialize the log4j system properly.kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)	at kafka.producer.Producer.send(Producer.scala:76)	at kafka.javaapi.producer.Producer.send(Producer.scala:33)	at ProducerTest.main(TestProducer.java:21)
须要修改config目录下的server.properties中的下面两个属性
zookeeper.connect=localhost:2181改成zookeeper.connect=10.0.30.221:2181 以及默认凝视掉的#host.name=localhost改成host.name=10.0.30.221

转载于:https://www.cnblogs.com/zsychanpin/p/6991640.html

你可能感兴趣的文章
函数中关于const关键字使用的注意事项
查看>>
Web项目中的路径问题
查看>>
js随机数的取整
查看>>
十大经典预测算法(六)---集成学习(模型融合算法)
查看>>
用php做一个简单的注册用户功能
查看>>
一款基于css3的3D图片翻页切换特效
查看>>
Feign使用Hystrix无效原因及解决方法
查看>>
Sizeof与Strlen的区别与联系
查看>>
hadoop2.2.0_hbase0.96_zookeeper3.4.5全分布式安装文档下载
查看>>
Flutter 贝塞尔曲线切割
查看>>
golang 的编译安装以及supervisord部署
查看>>
阿里架构师,讲述基于微服务的软件架构模式
查看>>
Eclipse导入maven项目时,Pom.xml文件报错处理方法
查看>>
01、JAVA开发准备
查看>>
Jenkins+Maven+SVN快速搭建持续集成环境(转)
查看>>
txmpp
查看>>
【Github教程】史上最全github使用方法:github入门到精通
查看>>
抽象工厂模式(Abstract Factory)
查看>>
luogu1373 小a和uim之大逃离 (dp)
查看>>
Redis的Pub/Sub客户端实现
查看>>