package org.apache.kafka.connect.mirror;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/mirror/MirrorSourceTask.class */
public class MirrorSourceTask extends SourceTask {
    private static final Logger log = LoggerFactory.getLogger(MirrorSourceTask.class);
    private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10;
    private KafkaConsumer<byte[], byte[]> consumer;
    private KafkaProducer<byte[], byte[]> offsetProducer;
    private String sourceClusterAlias;
    private String offsetSyncsTopic;
    private Duration pollTimeout;
    private long maxOffsetLag;
    private Map<TopicPartition, PartitionState> partitionStates;
    private ReplicationPolicy replicationPolicy;
    private MirrorSourceMetrics metrics;
    private boolean stopping;
    private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs;
    private Semaphore outstandingOffsetSyncs;
    private Semaphore consumerAccess;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/connect/mirror/MirrorSourceTask$PartitionState.class */
    public static class PartitionState {
        long previousUpstreamOffset = -1;
        long previousDownstreamOffset = -1;
        long lastSyncDownstreamOffset = -1;
        long maxOffsetLag;
        boolean shouldSyncOffsets;

        PartitionState(long j) {
            this.maxOffsetLag = j;
        }

        boolean update(long j, long j2) {
            boolean z = this.lastSyncDownstreamOffset == -1;
            boolean z2 = j2 - (this.lastSyncDownstreamOffset + 1) >= this.maxOffsetLag;
            boolean z3 = j - this.previousUpstreamOffset != 1;
            boolean z4 = j2 < this.previousDownstreamOffset;
            if (z || z2 || z3 || z4) {
                this.lastSyncDownstreamOffset = j2;
                this.shouldSyncOffsets = true;
            }
            this.previousUpstreamOffset = j;
            this.previousDownstreamOffset = j2;
            return this.shouldSyncOffsets;
        }

        void reset() {
            this.shouldSyncOffsets = false;
        }
    }

    public MirrorSourceTask() {
        this.stopping = false;
        this.pendingOffsetSyncs = new LinkedHashMap();
    }

    MirrorSourceTask(KafkaConsumer<byte[], byte[]> kafkaConsumer, MirrorSourceMetrics mirrorSourceMetrics, String str, ReplicationPolicy replicationPolicy, long j, KafkaProducer<byte[], byte[]> kafkaProducer, Semaphore semaphore, Map<TopicPartition, PartitionState> map, String str2) {
        this.stopping = false;
        this.pendingOffsetSyncs = new LinkedHashMap();
        this.consumer = kafkaConsumer;
        this.metrics = mirrorSourceMetrics;
        this.sourceClusterAlias = str;
        this.replicationPolicy = replicationPolicy;
        this.maxOffsetLag = j;
        this.consumerAccess = new Semaphore(1);
        this.offsetProducer = kafkaProducer;
        this.outstandingOffsetSyncs = semaphore;
        this.partitionStates = map;
        this.offsetSyncsTopic = str2;
    }

    public void start(Map<String, String> map) {
        MirrorSourceTaskConfig mirrorSourceTaskConfig = new MirrorSourceTaskConfig(map);
        this.pendingOffsetSyncs.clear();
        this.outstandingOffsetSyncs = new Semaphore(MAX_OUTSTANDING_OFFSET_SYNCS);
        this.consumerAccess = new Semaphore(1);
        this.sourceClusterAlias = mirrorSourceTaskConfig.sourceClusterAlias();
        this.metrics = mirrorSourceTaskConfig.metrics();
        this.pollTimeout = mirrorSourceTaskConfig.consumerPollTimeout();
        this.maxOffsetLag = mirrorSourceTaskConfig.maxOffsetLag();
        this.replicationPolicy = mirrorSourceTaskConfig.replicationPolicy();
        this.partitionStates = new HashMap();
        this.offsetSyncsTopic = mirrorSourceTaskConfig.offsetSyncsTopic();
        this.consumer = MirrorUtils.newConsumer(mirrorSourceTaskConfig.sourceConsumerConfig());
        this.offsetProducer = MirrorUtils.newProducer(mirrorSourceTaskConfig.offsetSyncsTopicProducerConfig());
        Set<TopicPartition> taskTopicPartitions = mirrorSourceTaskConfig.taskTopicPartitions();
        Map<TopicPartition, Long> loadOffsets = loadOffsets(taskTopicPartitions);
        this.consumer.assign(loadOffsets.keySet());
        log.info("Starting with {} previously uncommitted partitions.", Long.valueOf(loadOffsets.entrySet().stream().filter(entry -> {
            return ((Long) entry.getValue()).longValue() == 0;
        }).count()));
        log.trace("Seeking offsets: {}", loadOffsets);
        KafkaConsumer<byte[], byte[]> kafkaConsumer = this.consumer;
        kafkaConsumer.getClass();
        loadOffsets.forEach((v1, v2) -> {
            r1.seek(v1, v2);
        });
        log.info("{} replicating {} topic-partitions {}->{}: {}.", new Object[]{Thread.currentThread().getName(), Integer.valueOf(taskTopicPartitions.size()), this.sourceClusterAlias, mirrorSourceTaskConfig.targetClusterAlias(), taskTopicPartitions});
    }

    public void commit() {
        firePendingOffsetSyncs();
    }

    public void stop() {
        long currentTimeMillis = System.currentTimeMillis();
        this.stopping = true;
        this.consumer.wakeup();
        try {
            this.consumerAccess.acquire();
        } catch (InterruptedException e) {
            log.warn("Interrupted waiting for access to consumer. Will try closing anyway.");
        }
        Utils.closeQuietly(this.consumer, "source consumer");
        Utils.closeQuietly(this.offsetProducer, "offset producer");
        Utils.closeQuietly(this.metrics, "metrics");
        log.info("Stopping {} took {} ms.", Thread.currentThread().getName(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public String version() {
        return new MirrorSourceConnector().version();
    }

    public List<SourceRecord> poll() {
        if (!this.consumerAccess.tryAcquire()) {
            return null;
        }
        try {
            if (this.stopping) {
                return null;
            }
            try {
                ConsumerRecords poll = this.consumer.poll(this.pollTimeout);
                ArrayList arrayList = new ArrayList(poll.count());
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord<byte[], byte[]> consumerRecord = (ConsumerRecord) it.next();
                    SourceRecord convertRecord = convertRecord(consumerRecord);
                    arrayList.add(convertRecord);
                    TopicPartition topicPartition = new TopicPartition(convertRecord.topic(), convertRecord.kafkaPartition().intValue());
                    this.metrics.recordAge(topicPartition, System.currentTimeMillis() - consumerRecord.timestamp());
                    this.metrics.recordBytes(topicPartition, byteSize((byte[]) consumerRecord.value()));
                }
                if (arrayList.isEmpty()) {
                    this.consumerAccess.release();
                    return null;
                }
                log.trace("Polled {} records from {}.", Integer.valueOf(arrayList.size()), poll.partitions());
                this.consumerAccess.release();
                return arrayList;
            } catch (KafkaException e) {
                log.warn("Failure during poll.", e);
                this.consumerAccess.release();
                return null;
            } catch (WakeupException e2) {
                this.consumerAccess.release();
                return null;
            } catch (Throwable th) {
                log.error("Failure during poll.", th);
                throw th;
            }
        } catch (Throwable th2) {
            this.consumerAccess.release();
            throw th2;
        }
    }

    public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) {
        if (this.stopping) {
            return;
        }
        if (recordMetadata == null) {
            log.debug("No RecordMetadata (source record was probably filtered out during transformation) -- can't sync offsets for {}.", sourceRecord.topic());
            return;
        }
        if (!recordMetadata.hasOffset()) {
            log.error("RecordMetadata has no offset -- can't sync offsets for {}.", sourceRecord.topic());
            return;
        }
        TopicPartition topicPartition = new TopicPartition(sourceRecord.topic(), sourceRecord.kafkaPartition().intValue());
        long currentTimeMillis = System.currentTimeMillis() - sourceRecord.timestamp().longValue();
        this.metrics.countRecord(topicPartition);
        this.metrics.replicationLatency(topicPartition, currentTimeMillis);
        maybeQueueOffsetSyncs(MirrorUtils.unwrapPartition(sourceRecord.sourcePartition()), MirrorUtils.unwrapOffset(sourceRecord.sourceOffset()).longValue(), recordMetadata.offset());
        firePendingOffsetSyncs();
    }

    private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long j, long j2) {
        PartitionState computeIfAbsent = this.partitionStates.computeIfAbsent(topicPartition, topicPartition2 -> {
            return new PartitionState(this.maxOffsetLag);
        });
        if (computeIfAbsent.update(j, j2)) {
            OffsetSync offsetSync = new OffsetSync(topicPartition, j, j2);
            synchronized (this) {
                this.pendingOffsetSyncs.put(topicPartition, offsetSync);
            }
            computeIfAbsent.reset();
        }
    }

    private void firePendingOffsetSyncs() {
        OffsetSync next;
        while (true) {
            synchronized (this) {
                Iterator<OffsetSync> it = this.pendingOffsetSyncs.values().iterator();
                if (!it.hasNext()) {
                    log.trace("No more pending offset syncs");
                    return;
                }
                next = it.next();
                if (!this.outstandingOffsetSyncs.tryAcquire()) {
                    log.trace("Too many in-flight offset syncs; will try to send remaining offset syncs later");
                    return;
                }
                it.remove();
            }
            sendOffsetSync(next);
            log.trace("Dispatched offset sync for {}", next.topicPartition());
        }
    }

    private void sendOffsetSync(OffsetSync offsetSync) {
        this.offsetProducer.send(new ProducerRecord(this.offsetSyncsTopic, 0, offsetSync.recordKey(), offsetSync.recordValue()), (recordMetadata, exc) -> {
            if (exc != null) {
                log.error("Failure sending offset sync.", exc);
            } else {
                log.trace("Sync'd offsets for {}: {}=={}", new Object[]{offsetSync.topicPartition(), Long.valueOf(offsetSync.upstreamOffset()), Long.valueOf(offsetSync.downstreamOffset())});
            }
            this.outstandingOffsetSyncs.release();
        });
    }

    private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> set) {
        return (Map) set.stream().collect(Collectors.toMap(topicPartition -> {
            return topicPartition;
        }, this::loadOffset));
    }

    private Long loadOffset(TopicPartition topicPartition) {
        return Long.valueOf(MirrorUtils.unwrapOffset(this.context.offsetStorageReader().offset(MirrorUtils.wrapPartition(topicPartition, this.sourceClusterAlias))).longValue() + 1);
    }

    SourceRecord convertRecord(ConsumerRecord<byte[], byte[]> consumerRecord) {
        return new SourceRecord(MirrorUtils.wrapPartition(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), this.sourceClusterAlias), MirrorUtils.wrapOffset(consumerRecord.offset()), formatRemoteTopic(consumerRecord.topic()), Integer.valueOf(consumerRecord.partition()), Schema.OPTIONAL_BYTES_SCHEMA, consumerRecord.key(), Schema.BYTES_SCHEMA, consumerRecord.value(), Long.valueOf(consumerRecord.timestamp()), convertHeaders(consumerRecord));
    }

    private Headers convertHeaders(ConsumerRecord<byte[], byte[]> consumerRecord) {
        ConnectHeaders connectHeaders = new ConnectHeaders();
        for (Header header : consumerRecord.headers()) {
            connectHeaders.addBytes(header.key(), header.value());
        }
        return connectHeaders;
    }

    private String formatRemoteTopic(String str) {
        return this.replicationPolicy.formatRemoteTopic(this.sourceClusterAlias, str);
    }

    private static int byteSize(byte[] bArr) {
        if (bArr == null) {
            return 0;
        }
        return bArr.length;
    }
}
