package org.apache.kafka.connect.runtime;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.SubmittedRecords;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerSourceTask.class */
public class WorkerSourceTask extends AbstractWorkerSourceTask {
    private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
    private volatile SubmittedRecords.CommittableOffsets committableOffsets;
    private final SubmittedRecords submittedRecords;
    private final AtomicReference<Exception> producerSendException;

    public WorkerSourceTask(ConnectorTaskId connectorTaskId, SourceTask sourceTask, TaskStatus.Listener listener, TargetState targetState, Converter converter, Converter converter2, HeaderConverter headerConverter, TransformationChain<SourceRecord> transformationChain, Producer<byte[], byte[]> producer, TopicAdmin topicAdmin, Map<String, TopicCreationGroup> map, CloseableOffsetStorageReader closeableOffsetStorageReader, OffsetStorageWriter offsetStorageWriter, ConnectorOffsetBackingStore connectorOffsetBackingStore, WorkerConfig workerConfig, ClusterConfigState clusterConfigState, ConnectMetrics connectMetrics, ClassLoader classLoader, Time time, RetryWithToleranceOperator retryWithToleranceOperator, StatusBackingStore statusBackingStore, Executor executor) {
        super(connectorTaskId, sourceTask, listener, targetState, converter, converter2, headerConverter, transformationChain, new WorkerSourceTaskContext(closeableOffsetStorageReader, connectorTaskId, clusterConfigState, null), producer, topicAdmin, map, closeableOffsetStorageReader, offsetStorageWriter, connectorOffsetBackingStore, workerConfig, connectMetrics, classLoader, time, retryWithToleranceOperator, statusBackingStore, executor);
        this.committableOffsets = SubmittedRecords.CommittableOffsets.EMPTY;
        this.submittedRecords = new SubmittedRecords();
        this.producerSendException = new AtomicReference<>();
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void prepareToInitializeTask() {
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void prepareToEnterSendLoop() {
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void beginSendIteration() {
        updateCommittableOffsets();
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void prepareToPollTask() {
        maybeThrowProducerSendException();
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void recordDropped(SourceRecord sourceRecord) {
        commitTaskRecord(sourceRecord, null);
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected Optional<SubmittedRecords.SubmittedRecord> prepareToSendRecord(SourceRecord sourceRecord, ProducerRecord<byte[], byte[]> producerRecord) {
        maybeThrowProducerSendException();
        return Optional.of(this.submittedRecords.submit(sourceRecord));
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void recordDispatched(SourceRecord sourceRecord) {
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void batchDispatched() {
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void recordSent(SourceRecord sourceRecord, ProducerRecord<byte[], byte[]> producerRecord, RecordMetadata recordMetadata) {
        commitTaskRecord(sourceRecord, recordMetadata);
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void producerSendFailed(boolean z, ProducerRecord<byte[], byte[]> producerRecord, SourceRecord sourceRecord, Exception exc) {
        if (z) {
            throw new ConnectException("Unrecoverable exception trying to send", exc);
        }
        String str = producerRecord.topic();
        if (this.retryWithToleranceOperator.getErrorToleranceType() != ToleranceType.ALL) {
            this.producerSendException.compareAndSet(null, exc);
            return;
        }
        log.trace("Ignoring failed record send: {} failed to send record to {}: ", new Object[]{this, str, exc});
        this.retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class, sourceRecord, exc);
        commitTaskRecord(sourceRecord, null);
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void finalOffsetCommit(boolean z) {
        this.submittedRecords.awaitAllMessages(this.workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG).longValue(), TimeUnit.MILLISECONDS);
        updateCommittableOffsets();
        commitOffsets();
    }

    public boolean commitOffsets() {
        SubmittedRecords.CommittableOffsets committableOffsets;
        long longValue = this.workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG).longValue();
        log.debug("{} Committing offsets", this);
        long milliseconds = this.time.milliseconds();
        long j = milliseconds + longValue;
        synchronized (this) {
            committableOffsets = this.committableOffsets;
            this.committableOffsets = SubmittedRecords.CommittableOffsets.EMPTY;
        }
        if (committableOffsets.isEmpty()) {
            log.debug("{} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors.", this);
        } else {
            log.info("{} Committing offsets for {} acknowledged messages", this, Integer.valueOf(committableOffsets.numCommittableMessages()));
            if (committableOffsets.hasPending()) {
                log.debug("{} There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. The source partition with the most pending messages is {}, with {} pending messages", new Object[]{this, Integer.valueOf(committableOffsets.numUncommittableMessages()), Integer.valueOf(committableOffsets.numDeques()), committableOffsets.largestDequePartition(), Integer.valueOf(committableOffsets.largestDequeSize())});
            } else {
                log.debug("{} There are currently no pending messages for this offset commit; all messages dispatched to the task's producer since the last commit have been acknowledged", this);
            }
        }
        Map<Map<String, Object>, Map<String, Object>> offsets = committableOffsets.offsets();
        OffsetStorageWriter offsetStorageWriter = this.offsetWriter;
        offsetStorageWriter.getClass();
        offsets.forEach(offsetStorageWriter::offset);
        try {
            if (!this.offsetWriter.beginFlush(j - this.time.milliseconds(), TimeUnit.MILLISECONDS)) {
                long milliseconds2 = this.time.milliseconds() - milliseconds;
                recordCommitSuccess(milliseconds2);
                log.debug("{} Finished offset commitOffsets successfully in {} ms", this, Long.valueOf(milliseconds2));
                commitSourceTask();
                return true;
            }
            Future<Void> doFlush = this.offsetWriter.doFlush((th, r7) -> {
                if (th != null) {
                    log.error("{} Failed to flush offsets to storage: ", this, th);
                } else {
                    log.trace("{} Finished flushing offsets to storage", this);
                }
            });
            if (doFlush == null) {
                this.offsetWriter.cancelFlush();
                recordCommitFailure(this.time.milliseconds() - milliseconds, null);
                return false;
            }
            try {
                doFlush.get(Math.max(j - this.time.milliseconds(), 0L), TimeUnit.MILLISECONDS);
                long milliseconds3 = this.time.milliseconds() - milliseconds;
                recordCommitSuccess(milliseconds3);
                log.debug("{} Finished commitOffsets successfully in {} ms", this, Long.valueOf(milliseconds3));
                commitSourceTask();
                return true;
            } catch (InterruptedException e) {
                log.warn("{} Flush of offsets interrupted, cancelling", this);
                this.offsetWriter.cancelFlush();
                recordCommitFailure(this.time.milliseconds() - milliseconds, e);
                return false;
            } catch (ExecutionException e2) {
                log.error("{} Flush of offsets threw an unexpected exception: ", this, e2);
                this.offsetWriter.cancelFlush();
                recordCommitFailure(this.time.milliseconds() - milliseconds, e2);
                return false;
            } catch (TimeoutException e3) {
                log.error("{} Timed out waiting to flush offsets to storage; will try again on next flush interval with latest offsets", this);
                this.offsetWriter.cancelFlush();
                recordCommitFailure(this.time.milliseconds() - milliseconds, null);
                return false;
            }
        } catch (InterruptedException e4) {
            log.warn("{} Interrupted while waiting for previous offset flush to complete, cancelling", this);
            recordCommitFailure(this.time.milliseconds() - milliseconds, e4);
            return false;
        } catch (TimeoutException e5) {
            log.warn("{} Timed out while waiting for previous offset flush to complete, cancelling", this);
            recordCommitFailure(this.time.milliseconds() - milliseconds, e5);
            return false;
        }
    }

    private void updateCommittableOffsets() {
        SubmittedRecords.CommittableOffsets committableOffsets = this.submittedRecords.committableOffsets();
        synchronized (this) {
            this.committableOffsets = this.committableOffsets.updatedWith(committableOffsets);
        }
    }

    private void maybeThrowProducerSendException() {
        if (this.producerSendException.get() != null) {
            throw new ConnectException("Unrecoverable exception from producer send callback", this.producerSendException.get());
        }
    }

    public String toString() {
        return "WorkerSourceTask{id=" + this.id + '}';
    }
}
