Kafka 幂等和事务消息实践

producer 可能给多个 topic,多个 partition 发送消息,这些消息组成一个事务,这些消息需要对consumer同时可见或者同时不可见。Kafka事务需要在producer端处理,consumer端不需要做特殊处理,跟普通消息消费一样

https://www.cnblogs.com/enhance/p/17145168.html

1、事务流程

整个流程步骤:

  1. 事务初始化: InitTransactions,事务初始化是一次性的而事务开启、提交/回滚则一致循环运行
  2. 开启事务: beginTransaction
  3. 发送消息,向n个topic发送多条:producer.send
  4. 提交事务: commitTransaction
  5. 回滚事务:abortTransaction

2、事务配置

Producer Consumer
transactional.id 事务ID,类型为String字符串,默认为空,客户端自定义,例如”order_bus” isolation.level 事务隔离级别,默认为空,开启事务的话,需要将其设置为”read_committed”
enable.idempotence 消息幂等开关,true/false,默认为false,当配置了transactional.id,此项一定要设置为true,否则会抛出客户端配置异常
transaction.timeout.ms 事务超时时间,默认为10秒,最长为15分钟

enable.idempotence设置为true时,kafka会检查如下一些级联配置

配置项 内容要求 说明
acks 要求此配置项必须设置为all 响应必须要设置为all,也就是leader存储消息,并且所有follower也存储了消息后再返回,保证消息的可靠性
retries > 0 因为幂等特性保证了数据不会重复,在需要强可靠性的前提下,需要用户设置的重试次数 > 0
max.in.flight.requests.per.connection <= 5 此项配置是表明在producer还未收到broker应答的最大消息批次数量。该值设置的越大,标识可允许的吞吐越高,同时也越容易造成消息乱序

3、事务初始化

参与方:Producer、Broker

事务初始化由producer端触发,执行事务初始化主要做以下两个操作:

a) 定位TransactionCoordinator
b) 初始化producerId

事务初始化代码:

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

3.1 定位 TransactionCoordinator(获取tc)

通过producer发送Coordinator请求(请求中包含自定义transactionid)到broker,broker获取到transactionid做hashcode,hashcode对kafka内部topic transactionstate默认分区(默认50)取模,获取分区对应的broker作为transactionCoordinator

TransactionCoordinator可以理解为分布式事务中二(三)阶段提交的事务协调者。Kafka事务中TransactionCoordinator本质也是一个broker,只是这个broker起到针对当前事务的协调作用,所有的事务操作都需要直接发送给这个指定的broker

3.2 初始化ProducerId(Producer向tc获取)

Producer获取到TransactionCoordinator后,便需要向TransactionCoordinator发送请求获取producerId以及Epoch。可以认为producerId+Epoch是对事务型producer唯一标识。

Producer在事务(初始化阶段)启动之前向TransactionCoordinator申请producerId,TransactionCoordinator服务在分配producerId后会将producerId和Epoch持久到事务topic transactionstate中,这样就算producer宕机重启后也能处理未完成的事务

后续producer向broker发送请求也需要携带producerid和Epoch,这两个参数含义如下

参数 类型 含义
ProducerId Long 从0开始,对应Producer端配置的TransactionId,他们存在映射关系,可以通过TransactionId来查询ProducerId;映射关系存储在kafka内部topic _transactionstate中
Epoch Short 从0开始,Producer每次重启,此项值都会+1;当超过short最大值后,ProducerId+1

初始化ProducerId后,transactionCoordinator向transactionstate中写入一条key-value数据(该数据持久化在broker端)此数据key是transactionid,此时事务状态为empty,同时transactionCoordinator向producer返回producerId和Epoch。transactionCoordinator向transactionstate存储的数据格式如下:

4、事务启动

参与方:Producer

producer.beginTransaction();

5、事务消息发送

参与方:Producer、 broker

producer.send()

5.1 消息发送-Producer

Producer在接收到producerId后就可以正常发送消息,不过在发送消息前,需要将这些消息的分区地址上传到transactionCoordinator。transactionCoordinator会将这些分区地址持久化到事务topic _transaction_state(持久化这些消息分区作用是为了后边6.2节中事务提交阶段知道该事务涉及到的所有分区,为每个分区生成提交请求,存到队列里等待发送).;

之后Producer便可以向对应分区发送消息

5.2 消息发送-TransactionCoordinator

消息发送阶段,TransactionCoordinator主要记录当前事务消息所在分区信息,即更新_transaction_state状态

5.3 消息发送 -broker

消息发送时,broker 工作是维护 LSO(log stable offset), 一个分区可能存储了多个事务消息,也有可能存储多个非事务普通消息,而 LSO 为第一个正在进行中的事务消息的 offset

如下图:

6、事务提交

参与方:Producer、Broker

producer.commitTransaction()

6.1、事务提交-Producer

事务提交阶段,producer只是触发提交动作,并携带以下所需参数:

  • transactionId:事务Id,即客户自定义字符串
  • producerId:由transaction Coordinator生成,递增
  • epoch:transaction Coordinator生成,递增
  • committed:true

6.2、事务提交-transactionCoordinator

producer提交事务——>TransactionCoordinator收到后,请求将状态修改为PrepareCommit——>TransactionCoordinator向Producer响应——>TransactionCoordinator向各个broker发送control marker消息,broker收到后将消息存储下来,用来比较当前事务已经成功提交——>待各个broker存储control marker后,coordinator将事务状态改为commit,事务结束

7、事务取消

参与方:producer、transactionCoordinator

producer.abortTransaction()

7.1 事务取消-producer

事务取消阶段,producer只是触发取消动作,并携带以下所需参数:

  • transactionId:事务Id,即客户自定义字符串
  • producerId:由transaction Coordinator生成,递增
  • epoch:transaction Coordinator生成,递增
  • committed:false

7.2 事务取消-TransactionCoordinator

事务取消除了由Producer触发外,还有可能由Coordinator触发,例如“事务超时”,Coordinator有个定时器,定时扫描那些已经超时的事务

8、示例代码

8.1 producer

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;

public class TransactionProducer {

    private static Map<String, Object> getProducerProps(){
        Map<String, Object> props = new HashMap<>();
        // ZK 服务地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        // 批量发送的大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 100);
        // 发送频率,满足任务一个条件发送
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
        // 缓冲区大小,根据本机内存大小配置
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        // 客户端ID,便于统计
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer");
        // 事务ID
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my_tx_id");
        // 启用幂等性
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // 要求ISR都确认
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        // 只读取已提交的消息
        // props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        return props;
    }

    public static void main(String[] args) {
        // 创建 KafkaProducer
        KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProps());
        // 初始化事务
        producer.initTransactions();
        try {
            Thread.sleep(2000);
            // 开启事务
            producer.beginTransaction();
            // 发送消息到 tp_tx_01
            producer.send(new ProducerRecord<>("tp_tx_01", "tx_msg_01"));
            // 发送消息到 tp_tx_02
            producer.send(new ProducerRecord<>("tp_tx_02", "tx_msg_02"));
            // 提交事务
            producer.commitTransaction();
        } catch (Exception e){
            // 终止事务
            producer.abortTransaction();
        } finally {
            // 关闭 KafkaProducer
            producer.close();
        }
    }
}

8.2 producer+consumer

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class TransactionProducerAndConsumer {

    private static Map<String, Object> getProducerProps(){
        Map<String, Object> props = new HashMap<>();
        // ZK 服务地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        // 批量发送的大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 100);
        // 发送频率,满足任务一个条件发送
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
        // 缓冲区大小,根据本机内存大小配置
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        // 客户端ID,便于统计
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer_01");
        // 事务ID
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id_02");
        // 启用幂等性
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // 要求ISR都确认
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        // 只读取已提交的消息
        // props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        return props;
    }

    public static Map<String, Object> getConsumerProps(String consumerGroupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 设置消费组ID
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_grp_02");
        // 不启用消费者偏移量的自动确认,也不要手动确认
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_client_02");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 只读取已提交的消息
        // props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

        return props;
    }

    public static void main(String[] args) {
        // 创建 KafkaProducer
        KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProps());
        // 创建 KafkaConsumer
        String consumerGroupId = "consumer_grp_id_101";
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProps(consumerGroupId));
        // 事务的初始化
        producer.initTransactions();
        // 订阅主题
        consumer.subscribe(Collections.singleton("tp_tx_01"));
        final ConsumerRecords<String, String> records = consumer.poll(1_000);
        // 开启事务
        producer.beginTransaction();
        try {
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
                producer.send(new ProducerRecord<>("tp_tx_out_01", record.key(), record.value()));
                offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
                // 偏 移量表示下一条要消费的消息
            }
            // 将该消息的偏移量提交作为事务的一部分,随事务提交和回滚(不提交消费偏移量)
            producer.sendOffsetsToTransaction(offsets, consumerGroupId);
            // 提交事务
            producer.commitTransaction();
        } catch (Exception e) {
            // 回滚事务
            producer.abortTransaction();
        } finally {
            // 关闭资源
            producer.close();
            consumer.close();
        }
    }
}
作者:Jeebiz  创建时间:2025-02-21 14:36
最后编辑:Jeebiz  更新时间:2025-02-22 15:04