package org.apache.seatunnel.connectors.seatunnel.kafka.sink;

import com.google.common.collect.Lists;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.class */
public class KafkaTransactionSender<K, V> implements KafkaProduceSender<K, V> {
    private static final Logger log = LoggerFactory.getLogger(KafkaTransactionSender.class);
    private KafkaInternalProducer<K, V> kafkaProducer;
    private String transactionId;
    private final String transactionPrefix;
    private final Properties kafkaProperties;

    public KafkaTransactionSender(String str, Properties properties) {
        this.transactionPrefix = str;
        this.kafkaProperties = properties;
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaProduceSender
    public void send(ProducerRecord<K, V> producerRecord) {
        this.kafkaProducer.send(producerRecord);
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaProduceSender
    public void beginTransaction(String str) {
        this.transactionId = str;
        this.kafkaProducer = getTransactionProducer(this.kafkaProperties, str);
        this.kafkaProducer.beginTransaction();
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaProduceSender
    public Optional<KafkaCommitInfo> prepareCommit() {
        return Optional.of(new KafkaCommitInfo(this.transactionId, this.kafkaProperties, this.kafkaProducer.getProducerId(), this.kafkaProducer.getEpoch()));
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaProduceSender
    public void abortTransaction() {
        this.kafkaProducer.abortTransaction();
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaProduceSender
    public void abortTransaction(long j) {
        KafkaInternalProducer<K, V> transactionProducer = this.kafkaProducer != null ? this.kafkaProducer : getTransactionProducer(this.kafkaProperties, KafkaSinkWriter.generateTransactionId(this.transactionPrefix, j));
        long j2 = j;
        while (true) {
            long j3 = j2;
            String generateTransactionId = KafkaSinkWriter.generateTransactionId(this.transactionPrefix, j3);
            transactionProducer.setTransactionalId(generateTransactionId);
            if (log.isDebugEnabled()) {
                log.debug("Abort kafka transaction: {}", generateTransactionId);
            }
            transactionProducer.flush();
            if (transactionProducer.getEpoch() == 0) {
                return;
            } else {
                j2 = j3 + 1;
            }
        }
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaProduceSender
    public List<KafkaSinkState> snapshotState(long j) {
        return Lists.newArrayList(new KafkaSinkState[]{new KafkaSinkState(this.transactionId, this.transactionPrefix, j, this.kafkaProperties)});
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.kafkaProducer != null) {
            this.kafkaProducer.flush();
            this.kafkaProducer.close();
        }
    }

    private KafkaInternalProducer<K, V> getTransactionProducer(Properties properties, String str) {
        Properties properties2 = (Properties) properties.clone();
        properties2.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, str);
        KafkaInternalProducer<K, V> kafkaInternalProducer = new KafkaInternalProducer<>(properties2, str);
        kafkaInternalProducer.initTransactions();
        return kafkaInternalProducer;
    }
}
