package org.apache.flink.cdc.connectors.base.source.enumerator;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeSet;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.source.assigner.AssignerStatus;
import org.apache.flink.cdc.connectors.base.source.assigner.HybridSplitAssigner;
import org.apache.flink.cdc.connectors.base.source.assigner.SplitAssigner;
import org.apache.flink.cdc.connectors.base.source.assigner.state.PendingSplitsState;
import org.apache.flink.cdc.connectors.base.source.meta.events.FinishedSnapshotSplitsAckEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.FinishedSnapshotSplitsReportEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.FinishedSnapshotSplitsRequestEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.LatestFinishedSplitsNumberEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.LatestFinishedSplitsNumberRequestEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitAssignedEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitMetaEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitMetaRequestEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitUpdateAckEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitUpdateRequestEvent;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
/* loaded from: input_file:org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.class */
public class IncrementalSourceEnumerator implements SplitEnumerator<SourceSplitBase, PendingSplitsState> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) IncrementalSourceEnumerator.class);
    private static final long CHECK_EVENT_INTERVAL = 30000;
    protected final SplitEnumeratorContext<SourceSplitBase> context;
    private final SourceConfig sourceConfig;
    protected final SplitAssigner splitAssigner;
    private List<List<FinishedSnapshotSplitInfo>> finishedSnapshotSplitMeta;
    private Boundedness boundedness;

    @Nullable
    protected Integer streamSplitTaskId = null;
    protected final TreeSet<Integer> readersAwaitingSplit = new TreeSet<>();

    public IncrementalSourceEnumerator(SplitEnumeratorContext<SourceSplitBase> splitEnumeratorContext, SourceConfig sourceConfig, SplitAssigner splitAssigner, Boundedness boundedness) {
        this.context = splitEnumeratorContext;
        this.sourceConfig = sourceConfig;
        this.splitAssigner = splitAssigner;
        this.boundedness = boundedness;
    }

    public void start() {
        this.splitAssigner.open();
        requestStreamSplitUpdateIfNeed();
        this.context.callAsync(this::getRegisteredReader, this::syncWithReaders, CHECK_EVENT_INTERVAL, CHECK_EVENT_INTERVAL);
    }

    public void handleSplitRequest(int i, @Nullable String str) {
        if (this.context.registeredReaders().containsKey(Integer.valueOf(i))) {
            this.readersAwaitingSplit.add(Integer.valueOf(i));
            assignSplits();
        }
    }

    public void addSplitsBack(List<SourceSplitBase> list, int i) {
        LOG.debug("Incremental Source Enumerator adds splits back: {}", list);
        Optional<SourceSplitBase> findAny = list.stream().filter((v0) -> {
            return v0.isStreamSplit();
        }).findAny();
        if (findAny.isPresent()) {
            LOG.info("The enumerator adds add stream split back: {}", findAny);
            this.streamSplitTaskId = null;
        }
        this.splitAssigner.addSplits(list);
    }

    public void addReader(int i) {
        requestStreamSplitUpdateIfNeed();
    }

    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
        if (sourceEvent instanceof FinishedSnapshotSplitsReportEvent) {
            LOG.info("The enumerator receives finished split offsets {} from subtask {}.", sourceEvent, Integer.valueOf(i));
            Map<String, Offset> finishedOffsets = ((FinishedSnapshotSplitsReportEvent) sourceEvent).getFinishedOffsets();
            this.splitAssigner.onFinishedSplits(finishedOffsets);
            requestStreamSplitUpdateIfNeed();
            this.context.sendEventToSourceReader(i, new FinishedSnapshotSplitsAckEvent(new ArrayList(finishedOffsets.keySet())));
            return;
        }
        if (sourceEvent instanceof StreamSplitMetaRequestEvent) {
            LOG.debug("The enumerator receives request for stream split meta from subtask {}.", Integer.valueOf(i));
            sendStreamMetaRequestEvent(i, (StreamSplitMetaRequestEvent) sourceEvent);
            return;
        }
        if (sourceEvent instanceof LatestFinishedSplitsNumberRequestEvent) {
            LOG.info("The enumerator receives request from subtask {} for the latest finished splits number after added newly tables. ", Integer.valueOf(i));
            handleLatestFinishedSplitNumberRequest(i);
        } else if (sourceEvent instanceof StreamSplitUpdateAckEvent) {
            LOG.info("The enumerator receives event that the streamSplit split has been updated from subtask {}. ", Integer.valueOf(i));
            this.splitAssigner.onStreamSplitUpdated();
        } else if (sourceEvent instanceof StreamSplitAssignedEvent) {
            LOG.info("The enumerator receives notice from subtask {} for the stream split assignment. ", Integer.valueOf(i));
            this.streamSplitTaskId = Integer.valueOf(i);
        }
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public PendingSplitsState m1135snapshotState(long j) {
        return this.splitAssigner.snapshotState(j);
    }

    public void notifyCheckpointComplete(long j) {
        this.splitAssigner.notifyCheckpointComplete(j);
        assignSplits();
    }

    public void close() {
        LOG.info("Closing enumerator...");
        this.splitAssigner.close();
    }

    protected void assignSplits() {
        Iterator<Integer> it = this.readersAwaitingSplit.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            if (!this.context.registeredReaders().containsKey(Integer.valueOf(intValue))) {
                it.remove();
            } else if (shouldCloseIdleReader(intValue)) {
                this.context.signalNoMoreSplits(intValue);
                it.remove();
                LOG.info("Close idle reader of subtask {}", Integer.valueOf(intValue));
            } else {
                Optional<SourceSplitBase> next = this.splitAssigner.getNext();
                if (!next.isPresent()) {
                    requestStreamSplitUpdateIfNeed();
                    return;
                }
                SourceSplitBase sourceSplitBase = next.get();
                this.context.assignSplit(sourceSplitBase, intValue);
                if (sourceSplitBase instanceof StreamSplit) {
                    this.streamSplitTaskId = Integer.valueOf(intValue);
                }
                it.remove();
                LOG.info("Assign split {} to subtask {}", sourceSplitBase, Integer.valueOf(intValue));
            }
        }
    }

    private boolean shouldCloseIdleReader(int i) {
        return this.splitAssigner.noMoreSplits() && (this.boundedness == Boundedness.BOUNDED || !(!this.sourceConfig.isCloseIdleReaders() || this.streamSplitTaskId == null || this.streamSplitTaskId.intValue() == i));
    }

    protected int[] getRegisteredReader() {
        return this.context.registeredReaders().keySet().stream().mapToInt((v0) -> {
            return v0.intValue();
        }).toArray();
    }

    protected void syncWithReaders(int[] iArr, Throwable th) {
        if (th != null) {
            throw new FlinkRuntimeException("Failed to list obtain registered readers due to:", th);
        }
        if (this.splitAssigner.waitingForFinishedSplits()) {
            for (int i : iArr) {
                this.context.sendEventToSourceReader(i, new FinishedSnapshotSplitsRequestEvent());
            }
        }
        requestStreamSplitUpdateIfNeed();
    }

    private void requestStreamSplitUpdateIfNeed() {
        if (AssignerStatus.isNewlyAddedAssigningSnapshotFinished(this.splitAssigner.getAssignerStatus())) {
            if (this.streamSplitTaskId != null) {
                LOG.info("The enumerator requests subtask {} to update the stream split after newly added table.", this.streamSplitTaskId);
                this.context.sendEventToSourceReader(this.streamSplitTaskId.intValue(), new StreamSplitUpdateRequestEvent());
                return;
            }
            for (int i : getRegisteredReader()) {
                LOG.info("The enumerator requests subtask {} to update the stream split after newly added table.", Integer.valueOf(i));
                this.context.sendEventToSourceReader(i, new StreamSplitUpdateRequestEvent());
            }
        }
    }

    private void sendStreamMetaRequestEvent(int i, StreamSplitMetaRequestEvent streamSplitMetaRequestEvent) {
        if (this.finishedSnapshotSplitMeta == null) {
            List<FinishedSnapshotSplitInfo> finishedSplitInfos = this.splitAssigner.getFinishedSplitInfos();
            if (finishedSplitInfos.isEmpty()) {
                LOG.error("The assigner offer empty finished split information, this should not happen");
                throw new FlinkRuntimeException("The assigner offer empty finished split information, this should not happen");
            }
            this.finishedSnapshotSplitMeta = Lists.partition(finishedSplitInfos, this.sourceConfig.getSplitMetaGroupSize());
        }
        int requestMetaGroupId = streamSplitMetaRequestEvent.getRequestMetaGroupId();
        int totalFinishedSplitSize = streamSplitMetaRequestEvent.getTotalFinishedSplitSize();
        int size = this.splitAssigner.getFinishedSplitInfos().size();
        if (totalFinishedSplitSize > size) {
            LOG.warn("Total finished split size of subtask {} is {}, while total finished split size of enumerator is only {}. Try to truncate it", Integer.valueOf(i), Integer.valueOf(totalFinishedSplitSize), Integer.valueOf(size));
            this.context.sendEventToSourceReader(i, new StreamSplitMetaEvent(streamSplitMetaRequestEvent.getSplitId(), requestMetaGroupId, null, size));
        } else {
            if (this.finishedSnapshotSplitMeta.size() <= requestMetaGroupId) {
                throw new FlinkRuntimeException(String.format("The enumerator received invalid request meta group id %s, the valid meta group id range is [0, %s]. Total finished split size of reader is %s, while the total finished split size of enumerator is %s.", Integer.valueOf(requestMetaGroupId), Integer.valueOf(this.finishedSnapshotSplitMeta.size() - 1), Integer.valueOf(totalFinishedSplitSize), Integer.valueOf(size)));
            }
            this.context.sendEventToSourceReader(i, new StreamSplitMetaEvent(streamSplitMetaRequestEvent.getSplitId(), requestMetaGroupId, (List) this.finishedSnapshotSplitMeta.get(requestMetaGroupId).stream().map((v0) -> {
                return v0.serialize();
            }).collect(Collectors.toList()), size));
        }
    }

    private void handleLatestFinishedSplitNumberRequest(int i) {
        if (this.splitAssigner instanceof HybridSplitAssigner) {
            this.context.sendEventToSourceReader(i, new LatestFinishedSplitsNumberEvent(this.splitAssigner.getFinishedSplitInfos().size()));
        }
    }
}
