教育行業(yè)A股IPO第一股(股票代碼 003032)

全國咨詢/投訴熱線:400-618-4000

Kafka怎樣創(chuàng)建事務編程?【完整代碼編寫和測試流程】

更新時間:2022年01月12日15時35分 來源:傳智教育 瀏覽次數(shù):

需求

在Kafka的topic 「ods_user」中有一些用戶數(shù)據(jù),數(shù)據(jù)格式如下:

| 姓名,性別,出生日期

| 張三,1,1980-10-09

| 李四,0,1985-11-01

我們需要編寫程序,將用戶的性別轉換為男、女(1-男,0-女),轉換后將數(shù)據(jù)寫入到topic 「dwd_user」中。要求使用事務保障,要么消費了數(shù)據(jù)同時寫入數(shù)據(jù)到 topic,提交offset。要么全部失敗。

啟動生產者控制臺程序模擬數(shù)據(jù)

# 創(chuàng)建名為ods_user和dwd_user的主題
bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic ods_user
bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic dwd_user
# 生產數(shù)()到 ods_user
bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic ods_user
# 從dwd_user消費數(shù)()
bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic dwd_user 
--from-beginning  --isolation-level read_committed

編寫創(chuàng)建消費者代碼

編寫一個方法 createConsumer,該方法中返回一個消費者,訂閱「ods_user」主題。注意:需要配置事務隔離級別、關閉自動提交。

實現(xiàn)步驟:

1. 創(chuàng)建Kafka消費者配置

Properties props = new Properties();
 props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
 props.setProperty("group.id", "ods_user");
 props.put("isolation.level","read_committed");
 props.setProperty("enable.auto.commit", "false");
 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

2. 創(chuàng)建消費者,并訂閱 ods_user 主題

//1.(chuàng)建消費者
publicstaticConsumer<String,String>createConsumer(){
//1.(chuàng)建Kafka消費者配置
Propertiesprops=newProperties();
props.setProperty("bootstrap.servers","node1.itcast.cn:9092");
props.setProperty("group.id","ods_user");
props.put("isolation.level","read_committed");
props.setProperty("enable.auto.commit","false");
props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

//2.(chuàng)建Kafka消費者
KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);

//3.訂閱要消費的主題
consumer.subscribe(Arrays.asList("ods_user"));

returnconsumer;
}

編寫創(chuàng)建生產者代碼

編寫一個方法 createProducer,返回一個生產者對象。注意:需要配置事務的id,開啟了事務會默認開啟冪等性。

1. 創(chuàng)建生產者配置

Propertiesprops=newProperties();
props.put("bootstrap.servers","node1.itcast.cn:9092");
props.put("transactional.id","dwd_user");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

2.創(chuàng)建生產者對象

publicstaticProducer < String, String > createProduceer() {
    //1.(chuàng)建生產者配置
    Propertiesprops = newProperties();
    props.put("bootstrap.servers", "node1.itcast.cn:9092");
    props.put("transactional.id", "dwd_user");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    //2.(chuàng)建生產者
    Producer < String, String > producer = newKafkaProducer < > (props);
    returnproducer;
}

編寫代碼消費并生產數(shù)據(jù)

實現(xiàn)步驟:

1. 調用之前實現(xiàn)的方法,創(chuàng)建消費者、生產者對象

2. 生產者調用initTransactions初始化事務

3. 編寫一個while死循環(huán),在while循環(huán)中不斷拉取數(shù)據(jù),進行處理后,再寫入到指定的topic

(1) 生產者開啟事務

(2) 消費者拉取消息

(3) 遍歷拉取到的消息,并進行預處理(將1轉換為男,0轉換為女)

(4) 生產消息到dwd_user topic中

(5) 提交偏移量到事務中

(6) 提交事務

(7) 捕獲異常,如果出現(xiàn)異常,則取消事務

publicstaticvoidmain(String[] args) {
    Consumer < String, String > consumer = createConsumer();
    Producer < String, String > producer = createProducer();
    //初始化事務
    producer.initTransactions();
    while (true) {
        try {
            //1.開啟事務
            producer.beginTransaction();
            //2.定義Map結構,用于保存分區()對應的offset
            Map < TopicPartition, OffsetAndMetadata > offsetCommits = newHashMap < > ();
            //2.拉取消息
            ConsumerRecords < String, String > records = consumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord < String, String > record: records) {
                //3.保存偏移量
                offsetCommits.put(newTopicPartition(record.topic(), record.partition()), newOffsetAndMetadata(record.offset() + 1));
                //4.進行轉換處理
                String[] fields = record.value().split(",");
                fields[1] = fields[1].equalsIgnoreCase("1") ? "男" : "女";
                Stringmessage = fields[0] + "," + fields[1] + "," + fields[2];
                //5.生產消息到dwd_user
                producer.send(newProducerRecord < > ("dwd_user", message));
            }
            //6.提交偏移量到事務
            producer.sendOffsetsToTransaction(offsetCommits, "ods_user");
            //7.提交事務
            producer.commitTransaction();
        } catch (Exceptione) {
            //8.放棄事務
            producer.abortTransaction();
        }
    }
}

測試

往之前啟動的console-producer中寫入消息進行測試,同時檢查console-consumer是否能夠接收到消息:

kafka 創(chuàng)建事務編程

逐個測試一下消息:

  • 張三,1,1980-10-09
  • 李四,0,1985-11-01
//3.保存偏移量
offsetCommits.put(newTopicPartition(record.topic(),record.partition()),
       newOffsetAndMetadata(record.offset()+1));
//4.進行轉換處理
String[]fields=record.value().split(",");
fields[1]=fields[1].equalsIgnoreCase("1")?"男":"女";
Stringmessage=fields[0]+","+fields[1]+","+fields[2];

//模擬異常
inti=1/0;

//5.生產消息到dwd_user
producer.send(newProducerRecord<>("dwd_user",message));

啟動程序一次,拋出異常。

再啟動程序一次,還是拋出異常。

直到我們處理該異常為止。

我們發(fā)現(xiàn),可以消費到消息,但如果中間出現(xiàn)異常的話,offset是不會被提交的,除非消費、生產消息都成功,才會提交事務。






猜你喜歡:

怎樣一鍵啟動或關閉Kafka?有快捷的方法嗎?

為什么選擇kafka采集數(shù)據(jù)?

Kafka的常用API介紹[大數(shù)據(jù)培訓]

SparkStreaming連接Kafka兩種方式

傳智教育python+大數(shù)據(jù)開發(fā)培訓

0 分享到:
和我們在線交談!