package org.apache.hudi.connect.transaction;

import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.connect.writers.ConnectWriterProvider;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.connect.writers.KafkaConnectWriterProvider;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/hudi/connect/transaction/ConnectTransactionParticipant.class */
public class ConnectTransactionParticipant implements TransactionParticipant {
    private static final Logger LOG = LogManager.getLogger(ConnectTransactionParticipant.class);
    private final LinkedList<SinkRecord> buffer;
    private final BlockingQueue<ControlMessage> controlEvents;
    private final TopicPartition partition;
    private final SinkTaskContext context;
    private final KafkaControlAgent kafkaControlAgent;
    private final ConnectWriterProvider<WriteStatus> writerProvider;
    private TransactionInfo<WriteStatus> ongoingTransactionInfo;
    private long committedKafkaOffset;

    public ConnectTransactionParticipant(KafkaConnectConfigs kafkaConnectConfigs, TopicPartition topicPartition, KafkaControlAgent kafkaControlAgent, SinkTaskContext sinkTaskContext) throws HoodieException {
        this(topicPartition, kafkaControlAgent, sinkTaskContext, new KafkaConnectWriterProvider(kafkaConnectConfigs, topicPartition));
    }

    public ConnectTransactionParticipant(TopicPartition topicPartition, KafkaControlAgent kafkaControlAgent, SinkTaskContext sinkTaskContext, ConnectWriterProvider<WriteStatus> connectWriterProvider) throws HoodieException {
        this.buffer = new LinkedList<>();
        this.controlEvents = new LinkedBlockingQueue();
        this.partition = topicPartition;
        this.context = sinkTaskContext;
        this.writerProvider = connectWriterProvider;
        this.kafkaControlAgent = kafkaControlAgent;
        this.ongoingTransactionInfo = null;
        this.committedKafkaOffset = 0L;
    }

    @Override // org.apache.hudi.connect.transaction.TransactionParticipant
    public void start() {
        LOG.info("Start Hudi Transaction Participant for partition " + this.partition.partition());
        this.kafkaControlAgent.registerTransactionParticipant(this);
        this.context.pause(new TopicPartition[]{this.partition});
    }

    @Override // org.apache.hudi.connect.transaction.TransactionParticipant
    public void stop() {
        this.kafkaControlAgent.deregisterTransactionParticipant(this);
        cleanupOngoingTransaction();
    }

    @Override // org.apache.hudi.connect.transaction.TransactionParticipant
    public void buffer(SinkRecord sinkRecord) {
        this.buffer.add(sinkRecord);
    }

    @Override // org.apache.hudi.connect.transaction.TransactionParticipant
    public void processControlEvent(ControlMessage controlMessage) {
        this.controlEvents.add(controlMessage);
    }

    @Override // org.apache.hudi.connect.transaction.TransactionParticipant
    public long getLastKafkaCommittedOffset() {
        return this.committedKafkaOffset;
    }

    @Override // org.apache.hudi.connect.transaction.TransactionParticipant
    public TopicPartition getPartition() {
        return this.partition;
    }

    @Override // org.apache.hudi.connect.transaction.TransactionParticipant
    public void processRecords() {
        while (!this.controlEvents.isEmpty()) {
            ControlMessage poll = this.controlEvents.poll();
            switch (poll.getType()) {
                case START_COMMIT:
                    handleStartCommit(poll);
                    break;
                case END_COMMIT:
                    handleEndCommit(poll);
                    break;
                case ACK_COMMIT:
                    handleAckCommit(poll);
                    break;
                case WRITE_STATUS:
                    break;
                default:
                    throw new IllegalStateException("HudiTransactionParticipant received incorrect state " + poll.getType().name());
            }
        }
        writeRecords();
    }

    private void handleStartCommit(ControlMessage controlMessage) {
        cleanupOngoingTransaction();
        syncKafkaOffsetWithLeader(controlMessage);
        this.context.resume(new TopicPartition[]{this.partition});
        String commitTime = controlMessage.getCommitTime();
        LOG.info("Started a new transaction after receiving START_COMMIT for commit " + commitTime);
        try {
            this.ongoingTransactionInfo = new TransactionInfo<>(commitTime, this.writerProvider.getWriter2(commitTime));
            this.ongoingTransactionInfo.setExpectedKafkaOffset(this.committedKafkaOffset);
        } catch (Exception e) {
            LOG.warn("Error received while starting a new transaction", e);
        }
    }

    private void handleEndCommit(ControlMessage controlMessage) {
        if (this.ongoingTransactionInfo == null) {
            LOG.warn(String.format("END_COMMIT %s is received while we were NOT in active transaction", controlMessage.getCommitTime()));
            return;
        }
        if (!this.ongoingTransactionInfo.getCommitTime().equals(controlMessage.getCommitTime())) {
            LOG.error(String.format("Fatal error received END_COMMIT with commit time %s while local transaction commit time %s", controlMessage.getCommitTime(), this.ongoingTransactionInfo.getCommitTime()));
            cleanupOngoingTransaction();
            syncKafkaOffsetWithLeader(controlMessage);
        } else {
            this.context.pause(new TopicPartition[]{this.partition});
            this.ongoingTransactionInfo.commitInitiated();
            try {
                this.kafkaControlAgent.publishMessage(ControlMessage.newBuilder().setProtocolVersion(0).setType(ControlMessage.EventType.WRITE_STATUS).setTopicName(this.partition.topic()).setSenderType(ControlMessage.EntityType.PARTICIPANT).setSenderPartition(this.partition.partition()).setReceiverType(ControlMessage.EntityType.COORDINATOR).setReceiverPartition(0).setCommitTime(this.ongoingTransactionInfo.getCommitTime()).setParticipantInfo(ControlMessage.ParticipantInfo.newBuilder().setWriteStatus(KafkaConnectUtils.buildWriteStatuses(this.ongoingTransactionInfo.getWriter().close())).setKafkaOffset(this.ongoingTransactionInfo.getExpectedKafkaOffset()).build()).build());
            } catch (Exception e) {
                LOG.error(String.format("Error writing records and ending commit %s for partition %s", controlMessage.getCommitTime(), Integer.valueOf(this.partition.partition())), e);
                throw new HoodieIOException(String.format("Error writing records and ending commit %s for partition %s", controlMessage.getCommitTime(), Integer.valueOf(this.partition.partition())), new IOException(e));
            }
        }
    }

    private void handleAckCommit(ControlMessage controlMessage) {
        if (this.ongoingTransactionInfo != null && this.committedKafkaOffset < this.ongoingTransactionInfo.getExpectedKafkaOffset()) {
            this.committedKafkaOffset = this.ongoingTransactionInfo.getExpectedKafkaOffset();
        }
        syncKafkaOffsetWithLeader(controlMessage);
        cleanupOngoingTransaction();
    }

    private void writeRecords() {
        if (this.ongoingTransactionInfo == null || this.ongoingTransactionInfo.isCommitInitiated()) {
            return;
        }
        while (!this.buffer.isEmpty()) {
            try {
                SinkRecord peek = this.buffer.peek();
                if (peek != null && peek.kafkaOffset() == this.ongoingTransactionInfo.getExpectedKafkaOffset()) {
                    this.ongoingTransactionInfo.getWriter().writeRecord(peek);
                    this.ongoingTransactionInfo.setExpectedKafkaOffset(peek.kafkaOffset() + 1);
                } else if (peek != null && peek.kafkaOffset() > this.ongoingTransactionInfo.getExpectedKafkaOffset()) {
                    LOG.warn(String.format("Received a kafka record with offset %s above the next expected kafka offset %s for partition %s, hence resetting the kafka offset to %s", Long.valueOf(peek.kafkaOffset()), Long.valueOf(this.ongoingTransactionInfo.getExpectedKafkaOffset()), this.partition, Long.valueOf(this.ongoingTransactionInfo.getExpectedKafkaOffset())));
                    this.context.offset(this.partition, this.ongoingTransactionInfo.getExpectedKafkaOffset());
                } else if (peek != null && peek.kafkaOffset() < this.ongoingTransactionInfo.getExpectedKafkaOffset()) {
                    LOG.warn(String.format("Received a kafka record with offset %s below the next expected kafka offset %s for partition %s, no action will be taken but this record will be ignored since its already written", Long.valueOf(peek.kafkaOffset()), Long.valueOf(this.ongoingTransactionInfo.getExpectedKafkaOffset()), this.partition));
                }
                this.buffer.poll();
            } catch (Exception e) {
                LOG.warn(String.format("Error received while writing records for transaction %s in partition %s", this.ongoingTransactionInfo.getCommitTime(), Integer.valueOf(this.partition.partition())), e);
            }
        }
    }

    private void cleanupOngoingTransaction() {
        if (this.ongoingTransactionInfo != null) {
            try {
                this.ongoingTransactionInfo.getWriter().close();
                this.ongoingTransactionInfo = null;
            } catch (HoodieIOException e) {
                LOG.warn("Error received while trying to cleanup existing transaction", e);
            }
        }
    }

    private void syncKafkaOffsetWithLeader(ControlMessage controlMessage) {
        if (controlMessage.getCoordinatorInfo().getGlobalKafkaCommitOffsetsMap().containsKey(Integer.valueOf(this.partition.partition()))) {
            Long l = controlMessage.getCoordinatorInfo().getGlobalKafkaCommitOffsetsMap().get(Integer.valueOf(this.partition.partition()));
            if (l != null && l.longValue() >= 0) {
                if (l.longValue() != this.committedKafkaOffset) {
                    LOG.warn(String.format("The coordinator offset for kafka partition %s is %d while the locally committed offset is %d, hence resetting the local committed offset to the coordinator provided one to ensure consistency", this.partition, l, Long.valueOf(this.committedKafkaOffset)));
                }
                this.committedKafkaOffset = l.longValue();
                return;
            }
        } else {
            LOG.warn(String.format("The coordinator offset for kafka partition %s is not present while the locally committed offset is %d, hence resetting the local committed offset to 0 to avoid data loss", this.partition, Long.valueOf(this.committedKafkaOffset)));
        }
        this.committedKafkaOffset = 0L;
    }
}
