/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.reader.source;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter;
import org.apache.flink.connector.pulsar.source.reader.fetcher.PulsarOrderedFetcherManager;
import org.apache.flink.connector.pulsar.source.reader.source.PulsarSourceReaderBase;
import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
import org.apache.flink.core.io.InputStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class PulsarOrderedSourceReader<OUT>
extends PulsarSourceReaderBase<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarOrderedSourceReader.class);
    @VisibleForTesting
    final SortedMap<Long, Map<TopicPartition, MessageId>> cursorsToCommit;
    private final ConcurrentMap<TopicPartition, MessageId> cursorsOfFinishedSplits;
    private final AtomicReference<Throwable> cursorCommitThrowable = new AtomicReference();
    private ScheduledExecutorService cursorScheduler;

    public PulsarOrderedSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue, Supplier<PulsarOrderedPartitionSplitReader> splitReaderSupplier, PulsarRecordEmitter<OUT> recordEmitter, SourceReaderContext context, SourceConfiguration sourceConfiguration, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin) {
        super(elementsQueue, new PulsarOrderedFetcherManager(elementsQueue, splitReaderSupplier::get), recordEmitter, context, sourceConfiguration, pulsarClient, pulsarAdmin);
        this.cursorsToCommit = Collections.synchronizedSortedMap(new TreeMap());
        this.cursorsOfFinishedSplits = new ConcurrentHashMap<TopicPartition, MessageId>();
    }

    @Override
    public void start() {
        super.start();
        if (this.sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
            this.cursorScheduler = Executors.newSingleThreadScheduledExecutor();
            this.cursorScheduler.scheduleAtFixedRate(this::cumulativeAcknowledgmentMessage, this.sourceConfiguration.getMaxFetchTime().toMillis(), this.sourceConfiguration.getAutoCommitCursorInterval(), TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public InputStatus pollNext(ReaderOutput<OUT> output) throws Exception {
        this.checkErrorAndRethrow();
        return super.pollNext(output);
    }

    @Override
    protected void onSplitFinished(Map<String, PulsarPartitionSplitState> finishedSplitIds) {
        this.closeFinishedSplits(finishedSplitIds.keySet());
        if (LOG.isDebugEnabled()) {
            LOG.debug("onSplitFinished event: {}", finishedSplitIds);
        }
        for (Map.Entry<String, PulsarPartitionSplitState> entry : finishedSplitIds.entrySet()) {
            PulsarPartitionSplitState state = entry.getValue();
            MessageId latestConsumedId = state.getLatestConsumedId();
            if (latestConsumedId == null) continue;
            this.cursorsOfFinishedSplits.put(state.getPartition(), latestConsumedId);
        }
    }

    @Override
    public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
        List<PulsarPartitionSplit> splits = super.snapshotState(checkpointId);
        Map cursors = this.cursorsToCommit.computeIfAbsent(checkpointId, id -> new HashMap());
        for (PulsarPartitionSplit split : splits) {
            MessageId latestConsumedId = split.getLatestConsumedId();
            if (latestConsumedId == null) continue;
            cursors.put(split.getPartition(), latestConsumedId);
        }
        cursors.putAll(this.cursorsOfFinishedSplits);
        return splits;
    }

    public void notifyCheckpointComplete(long checkpointId) {
        LOG.debug("Committing cursors for checkpoint {}", (Object)checkpointId);
        Map cursors = (Map)this.cursorsToCommit.get(checkpointId);
        try {
            ((PulsarOrderedFetcherManager)this.splitFetcherManager).acknowledgeMessages(cursors);
            LOG.debug("Successfully acknowledge cursors for checkpoint {}", (Object)checkpointId);
            this.cursorsOfFinishedSplits.keySet().removeAll(cursors.keySet());
            this.cursorsToCommit.headMap(checkpointId + 1L).clear();
        }
        catch (Exception e) {
            LOG.error("Failed to acknowledge cursors for checkpoint {}", (Object)checkpointId, (Object)e);
            this.cursorCommitThrowable.compareAndSet(null, e);
        }
    }

    @Override
    public void close() throws Exception {
        if (this.cursorScheduler != null) {
            this.cursorScheduler.shutdown();
        }
        super.close();
    }

    private void checkErrorAndRethrow() {
        Throwable cause = this.cursorCommitThrowable.get();
        if (cause != null) {
            throw new RuntimeException("An error occurred in acknowledge message.", cause);
        }
    }

    private void cumulativeAcknowledgmentMessage() {
        HashMap<TopicPartition, MessageId> cursors = new HashMap<TopicPartition, MessageId>(this.cursorsOfFinishedSplits);
        List splits = super.snapshotState(1L);
        for (PulsarPartitionSplit split : splits) {
            MessageId latestConsumedId = split.getLatestConsumedId();
            if (latestConsumedId == null) continue;
            cursors.put(split.getPartition(), latestConsumedId);
        }
        try {
            ((PulsarOrderedFetcherManager)this.splitFetcherManager).acknowledgeMessages(cursors);
            this.cursorsOfFinishedSplits.keySet().removeAll(cursors.keySet());
        }
        catch (Exception e) {
            LOG.error("Fail in auto cursor commit.", (Throwable)e);
            this.cursorCommitThrowable.compareAndSet(null, e);
        }
    }
}

