kafka(3)API使用
2021-07-12 03:07
标签:规则 manual 技术 ted 代理信息 执行 ada interrupt final
相关依赖
kafka_2.11
kafka-clients
一个简单的Kafka生产者一般步骤如下:
创建 Properties 对象,设置生产者级别配置。以下3个配置是必须指定的。
(1)
bootstrap.servers 配置连接 Kafka 代理列表,不必包含 Kafka 集群所有的代理地址,当 连接上一个代理后,会从集群元数据信息中获取其他存活的代理信息。但为了保证能 够成功连上 Kafka 集群 在多代理集群的情况下建议至少配置两个代理。
key.serializer :配置用于序列化消息 Key 的类。
value.serializer :配置用于序列化消息实际数据的类。
(2)根据 Properties 对象实例化一个 KafkaProducer 对象。
(3)实例化 ProducerRecord 对象, 每条消息对应一个 ProducerRecord 对象。
(4)调用 KafkaProducer 发送消息的方法将 ProducerRecord 发送到 Kafka 相应节点。 Kafka提供了两个发送消息的方法,即 send(ProducerRecord
package com.kafka.action.chapter6.producer; import java.text.DecimalFormat; import java.util.Properties; import java.util.Random; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import com.kafka.action.chapter6.dto.StockQuotationinfo; /** * * @Title: QuotationProducer.java * @Package com.kafka.action.chapter6.producer * @Description: 单线程生产者 * @author licl * @date 2018年9月9日 */ public class QuotationProducer { // 设置实例生产消息的总数 private static final int MSG_SIZE = 100; // 主题名称 private static final String TOPIC = "test"; // kafka集群 private static final String BROKER_LIST = "192.168.1.106:9092"; private static KafkaProducerproducer = null; static { /* * I I 1. 构造用于实例化 Kaf kaProducer Properties 信息 */ Properties configs = initConfig(); // II 2. 初始化一个 KafkaProducer producer = new KafkaProducer (configs); } /* * 初始化 Kafka 配置 */ private static Properties initConfig() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return properties; } // 生产股票行情信息 private static StockQuotationinfo createQuotationinfo() { StockQuotationinfo quotationinfo = new StockQuotationinfo(); // 随机产生 1-10 之间的整数,然后与 600100 相加组成股票代码 Random r = new Random(); Integer stockCode = 600100 + r.nextInt(10); // /随机产生一个 0-1之间的浮点数 float random = (float) Math.random(); // 设置涨跌规则 if (random / 2 record = null; StockQuotationinfo quotationinfo = null; try { int num = 0; for (int i = 0; i (TOPIC, null, quotationinfo.getTradeTime(), quotationinfo.getStockCode(), quotationinfo.toString()); // 异步发送消息 // 1.正常发送 //producer.send(record); // 2.指定回调实现逻辑 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.out.println("Send message occurs exception"); exception.printStackTrace(); } if(exception == null){ System.out.println(String.format("offset:%s,partition:%s", metadata.offset(),metadata.partition())); } } }); if (num++ % 10 == 0) { // 休眠 2s Thread.sleep(2000L); } } } catch (Exception e) { e.printStackTrace(); }finally{ producer.close(); } } }
package com.kafka.action.chapter6.producer; import java.text.DecimalFormat; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; /** * * @Title: QuotationProducer.java * @Package com.kafka.action.chapter6.producer * @Description: 多线程生产者 * @date 2018年9月9日 */ import com.kafka.action.chapter6.dto.StockQuotationinfo; public class KafkaProducerThread implements Runnable { // 设置实例生产消息的总数 private static final int MSG_SIZE = 100; private static final String TOPIC = "test"; private KafkaProducerproducer = null; private ProducerRecord record = null; StockQuotationinfo quotationinfo = null; ExecutorService executor = Executors.newFixedThreadPool(10); long current = System.currentTimeMillis(); private static StockQuotationinfo createQuotationinfo() { StockQuotationinfo quotationinfo = new StockQuotationinfo(); // 随机产生 1-10 之间的整数,然后与 600100 相加组成股票代码 Random r = new Random(); Integer stockCode = 600100 + r.nextInt(10); // /随机产生一个 0-1之间的浮点数 float random = (float) Math.random(); // 设置涨跌规则 if (random / 2 (TOPIC, null, quotationinfo.getTradeTime(), quotationinfo.getStockCode(), quotationinfo.toString()); executor.submit(new KafkaProducerThread(producer, record)); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); executor.shutdown(); } } public KafkaProducerThread(KafkaProducer producer, ProducerRecord record) { this.producer = producer; this.record = record; } }
package com.kafka.action.chapter6.dto; import java.io.Serializable; public class StockQuotationinfo implements Serializable{ /** * */ private static final long serialVersionUID = 1L; public StockQuotationinfo() { super(); } //股票代码 private String stockCode ; //股票名称 private String stockName ; @Override public String toString() { return "StockQuotationinfo [stockCode=" + stockCode + ", stockName=" + stockName + ", tradeTime=" + tradeTime + ", preClosePrice=" + preClosePrice + ", openPrice=" + openPrice + ", currentPrice=" + currentPrice + ", highPrice=" + highPrice + ", lowPrice=" + lowPrice + "]"; } //交易时间 private long tradeTime; //昨日收盘价 private float preClosePrice; //开盘价 private float openPrice ; //当前价,收盘时即为当日收盘价 private float currentPrice ; //今日最高 private float highPrice; //今日最低 private float lowPrice; public StockQuotationinfo(String stockCode, String stockName, long tradeTime, float preClosePrice, float openPrice, float currentPrice, float highPrice, float lowPrice) { super(); this.stockCode = stockCode; this.stockName = stockName; this.tradeTime = tradeTime; this.preClosePrice = preClosePrice; this.openPrice = openPrice; this.currentPrice = currentPrice; this.highPrice = highPrice; this.lowPrice = lowPrice; } public String getStockCode() { return stockCode; } public void setStockCode(String stockCode) { this.stockCode = stockCode; } public String getStockName() { return stockName; } public void setStockName(String stockName) { this.stockName = stockName; } public long getTradeTime() { return tradeTime; } public void setTradeTime(long tradeTime) { this.tradeTime = tradeTime; } public float getPreClosePrice() { return preClosePrice; } public void setPreClosePrice(float preClosePrice) { this.preClosePrice = preClosePrice; } public float getOpenPrice() { return openPrice; } public void setOpenPrice(float openPrice) { this.openPrice = openPrice; } public float getCurrentPrice() { return currentPrice; } public void setCurrentPrice(float currentPrice) { this.currentPrice = currentPrice; } public float getHighPrice() { return highPrice; } public void setHighPrice(float highPrice) { this.highPrice = highPrice; } public float getLowPrice() { return lowPrice; } public void setLowPrice(float lowPrice) { this.lowPrice = lowPrice; } public static long getSerialversionuid() { return serialVersionUID; } }
消费者
package demo2; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.junit.Test; public class MyKafkaConsumer { /** * 自动提交offset */ @Test public void comsumeMsgAutoCommit() { Properties props = new Properties(); props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADRESS + ":" + Constants.KAFKA_SERVER_PORT); props.put("group.id", Constants.GROUP_ID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList(Constants.MY_TOPIC)); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } sleep(1); } } /** * 手动提交offset */ @Test public void consumerMsgManualCommit() { Properties props = new Properties(); props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADRESS + ":" + Constants.KAFKA_SERVER_PORT); props.put("group.id", Constants.GROUP_ID); props.put("max.poll.records", 10); props.put("auto.offset.reset", "earliest"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList(Constants.MY_TOPIC)); final int minBatchSize = 100; List > buffer = new ArrayList(); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); buffer.add(record); System.out.println(buffer.size()); } if (buffer.size() >= minBatchSize) { System.out.println("进入手动提交offset"); insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } } } private void insertIntoDb(List > buffer) { for (ConsumerRecord record : buffer) { System.out.printf("insertIntoDb:offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } private void sleep(int seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { e.printStackTrace(); } } }
package demo2; import java.net.InetAddress; public class Constants { final static String GROUP_ID = "test_group"; final static String MY_TOPIC = "test"; final static String KAFKA_SERVER_ADRESS = "192.168.1.106"; final static int KAFKA_SERVER_PORT = 9092; }
kafka(3)API使用
标签:规则 manual 技术 ted 代理信息 执行 ada interrupt final
原文地址:https://www.cnblogs.com/liclBlog/p/9613421.html
上一篇:第一个C#程序
下一篇:win10安装JDK