package org.apache.seatunnel.connectors.seatunnel.kafka.sink;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.class */
public class KafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
    private static final Logger log = LoggerFactory.getLogger(KafkaInternalProducer.class);
    private static final String TRANSACTION_MANAGER_STATE_ENUM = "org.apache.kafka.clients.producer.internals.TransactionManager$State";
    private static final String PRODUCER_ID_AND_EPOCH_FIELD_NAME = "producerIdAndEpoch";
    private String transactionalId;

    public KafkaInternalProducer(Properties properties, String str) {
        super(properties);
        this.transactionalId = str;
    }

    @Override // org.apache.kafka.clients.producer.KafkaProducer, org.apache.kafka.clients.producer.Producer
    public void initTransactions() {
        setTransactionalId(this.transactionalId);
        super.initTransactions();
    }

    @Override // org.apache.kafka.clients.producer.KafkaProducer, org.apache.kafka.clients.producer.Producer
    public void beginTransaction() throws ProducerFencedException {
        if (log.isDebugEnabled()) {
            log.debug("KafkaInternalProducer.beginTransaction. " + this.transactionalId);
        }
        super.beginTransaction();
    }

    @Override // org.apache.kafka.clients.producer.KafkaProducer, org.apache.kafka.clients.producer.Producer
    public void commitTransaction() throws ProducerFencedException {
        if (log.isDebugEnabled()) {
            log.debug("KafkaInternalProducer.commitTransaction." + this.transactionalId);
        }
        super.commitTransaction();
    }

    @Override // org.apache.kafka.clients.producer.KafkaProducer, org.apache.kafka.clients.producer.Producer
    public void abortTransaction() throws ProducerFencedException {
        super.abortTransaction();
    }

    public void setTransactionalId(String str) {
        if (log.isDebugEnabled()) {
            log.debug("KafkaInternalProducer.abortTransaction. Target transactionalId=" + str);
        }
        if (str.equals(this.transactionalId)) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("KafkaInternalProducer.abortTransaction. Current transactionalId={} not match target transactionalId={}", this.transactionalId, str);
        }
        Object transactionManager = getTransactionManager();
        synchronized (transactionManager) {
            ReflectionUtils.setField(transactionManager, "transactionalId", str);
            ReflectionUtils.setField(transactionManager, "currentState", getTransactionManagerState("UNINITIALIZED"));
            this.transactionalId = str;
        }
    }

    public short getEpoch() {
        return ((Short) ReflectionUtils.getField(ReflectionUtils.getField(getTransactionManager(), PRODUCER_ID_AND_EPOCH_FIELD_NAME).get(), "epoch").get()).shortValue();
    }

    public long getProducerId() {
        return ((Long) ReflectionUtils.getField(ReflectionUtils.getField(getTransactionManager(), PRODUCER_ID_AND_EPOCH_FIELD_NAME).get(), "producerId").get()).longValue();
    }

    public void resumeTransaction(long j, short s, boolean z) {
        log.info("Attempting to resume transaction {} with producerId {} and epoch {}", new Object[]{this.transactionalId, Long.valueOf(j), Short.valueOf(s)});
        Object transactionManager = getTransactionManager();
        synchronized (transactionManager) {
            Object obj = ReflectionUtils.getField(transactionManager, transactionManager.getClass(), "topicPartitionBookkeeper").get();
            transitionTransactionManagerStateTo(transactionManager, "INITIALIZING");
            ReflectionUtils.invoke(obj, "reset", new Object[0]);
            ReflectionUtils.setField(transactionManager, PRODUCER_ID_AND_EPOCH_FIELD_NAME, createProducerIdAndEpoch(j, s));
            transitionTransactionManagerStateTo(transactionManager, "READY");
            transitionTransactionManagerStateTo(transactionManager, "IN_TRANSACTION");
            ReflectionUtils.setField(transactionManager, "transactionStarted", Boolean.valueOf(z));
        }
    }

    public boolean isTxnStarted() {
        return ((Boolean) ReflectionUtils.getField(getTransactionManager(), "transactionStarted").get()).booleanValue();
    }

    private static Object createProducerIdAndEpoch(long j, short s) {
        try {
            Constructor<?> declaredConstructor = TransactionManager.class.getDeclaredField(PRODUCER_ID_AND_EPOCH_FIELD_NAME).getType().getDeclaredConstructor(Long.TYPE, Short.TYPE);
            declaredConstructor.setAccessible(true);
            return declaredConstructor.newInstance(Long.valueOf(j), Short.valueOf(s));
        } catch (IllegalAccessException | InstantiationException | NoSuchFieldException | NoSuchMethodException | InvocationTargetException e) {
            throw new KafkaConnectorException(KafkaConnectorErrorCode.VERSION_INCOMPATIBLE, "Incompatible KafkaProducer version", e);
        }
    }

    private Object getTransactionManager() {
        Optional field = ReflectionUtils.getField(this, KafkaProducer.class, "transactionManager");
        if (field.isPresent()) {
            return field.get();
        }
        throw new KafkaConnectorException(KafkaConnectorErrorCode.GET_TRANSACTIONMANAGER_FAILED, "Can't get transactionManager in KafkaProducer");
    }

    private static void transitionTransactionManagerStateTo(Object obj, String str) {
        ReflectionUtils.invoke(obj, "transitionTo", new Object[]{getTransactionManagerState(str)});
    }

    private static Enum<?> getTransactionManagerState(String str) {
        try {
            return Enum.valueOf(Class.forName(TRANSACTION_MANAGER_STATE_ENUM), str);
        } catch (ClassNotFoundException e) {
            throw new KafkaConnectorException(KafkaConnectorErrorCode.VERSION_INCOMPATIBLE, "Incompatible KafkaProducer version", e);
        }
    }
}
