更新時間:2022年01月12日15時35分 來源:傳智教育 瀏覽次數(shù):
在Kafka的topic 「ods_user」中有一些用戶數(shù)據(jù),數(shù)據(jù)格式如下:
| 姓名,性別,出生日期
| 張三,1,1980-10-09
| 李四,0,1985-11-01
我們需要編寫程序,將用戶的性別轉換為男、女(1-男,0-女),轉換后將數(shù)據(jù)寫入到topic 「dwd_user」中。要求使用事務保障,要么消費了數(shù)據(jù)同時寫入數(shù)據(jù)到 topic,提交offset。要么全部失敗。
# 創(chuàng)建名為ods_user和dwd_user的主題
bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic ods_user
bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic dwd_user
# 生產數(shù)據(jù)到 ods_user
bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic ods_user
# 從dwd_user消費數(shù)據(jù)
bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic dwd_user
--from-beginning --isolation-level read_committed
編寫一個方法 createConsumer,該方法中返回一個消費者,訂閱「ods_user」主題。注意:需要配置事務隔離級別、關閉自動提交。
實現(xiàn)步驟:
1. 創(chuàng)建Kafka消費者配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
props.setProperty("group.id", "ods_user");
props.put("isolation.level","read_committed");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
2. 創(chuàng)建消費者,并訂閱 ods_user 主題
//1.創(chuàng)建消費者
publicstaticConsumer<String,String>createConsumer(){
//1.創(chuàng)建Kafka消費者配置
Propertiesprops=newProperties();
props.setProperty("bootstrap.servers","node1.itcast.cn:9092");
props.setProperty("group.id","ods_user");
props.put("isolation.level","read_committed");
props.setProperty("enable.auto.commit","false");
props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//2.創(chuàng)建Kafka消費者
KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);
//3.訂閱要消費的主題
consumer.subscribe(Arrays.asList("ods_user"));
returnconsumer;
}
編寫一個方法 createProducer,返回一個生產者對象。注意:需要配置事務的id,開啟了事務會默認開啟冪等性。
1. 創(chuàng)建生產者配置
Propertiesprops=newProperties();
props.put("bootstrap.servers","node1.itcast.cn:9092");
props.put("transactional.id","dwd_user");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
2.創(chuàng)建生產者對象
publicstaticProducer < String, String > createProduceer() {
//1.創(chuàng)建生產者配置
Propertiesprops = newProperties();
props.put("bootstrap.servers", "node1.itcast.cn:9092");
props.put("transactional.id", "dwd_user");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2.創(chuàng)建生產者
Producer < String, String > producer = newKafkaProducer < > (props);
returnproducer;
}
實現(xiàn)步驟:
1. 調用之前實現(xiàn)的方法,創(chuàng)建消費者、生產者對象
2. 生產者調用initTransactions初始化事務
3. 編寫一個while死循環(huán),在while循環(huán)中不斷拉取數(shù)據(jù),進行處理后,再寫入到指定的topic
(1) 生產者開啟事務
(2) 消費者拉取消息
(3) 遍歷拉取到的消息,并進行預處理(將1轉換為男,0轉換為女)
(4) 生產消息到dwd_user topic中
(5) 提交偏移量到事務中
(6) 提交事務
(7) 捕獲異常,如果出現(xiàn)異常,則取消事務
publicstaticvoidmain(String[] args) {
Consumer < String, String > consumer = createConsumer();
Producer < String, String > producer = createProducer();
//初始化事務
producer.initTransactions();
while (true) {
try {
//1.開啟事務
producer.beginTransaction();
//2.定義Map結構,用于保存分區(qū)對應的offset
Map < TopicPartition, OffsetAndMetadata > offsetCommits = newHashMap < > ();
//2.拉取消息
ConsumerRecords < String, String > records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord < String, String > record: records) {
//3.保存偏移量
offsetCommits.put(newTopicPartition(record.topic(), record.partition()), newOffsetAndMetadata(record.offset() + 1));
//4.進行轉換處理
String[] fields = record.value().split(",");
fields[1] = fields[1].equalsIgnoreCase("1") ? "男" : "女";
Stringmessage = fields[0] + "," + fields[1] + "," + fields[2];
//5.生產消息到dwd_user
producer.send(newProducerRecord < > ("dwd_user", message));
}
//6.提交偏移量到事務
producer.sendOffsetsToTransaction(offsetCommits, "ods_user");
//7.提交事務
producer.commitTransaction();
} catch (Exceptione) {
//8.放棄事務
producer.abortTransaction();
}
}
}
往之前啟動的console-producer中寫入消息進行測試,同時檢查console-consumer是否能夠接收到消息:
逐個測試一下消息:
//3.保存偏移量
offsetCommits.put(newTopicPartition(record.topic(),record.partition()),
newOffsetAndMetadata(record.offset()+1));
//4.進行轉換處理
String[]fields=record.value().split(",");
fields[1]=fields[1].equalsIgnoreCase("1")?"男":"女";
Stringmessage=fields[0]+","+fields[1]+","+fields[2];
//模擬異常
inti=1/0;
//5.生產消息到dwd_user
producer.send(newProducerRecord<>("dwd_user",message));
啟動程序一次,拋出異常。
再啟動程序一次,還是拋出異常。
直到我們處理該異常為止。
我們發(fā)現(xiàn),可以消費到消息,但如果中間出現(xiàn)異常的話,offset是不會被提交的,除非消費、生產消息都成功,才會提交事務。