package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.SplitEnumeratorCompat;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.class */
public class FlinkSourceSplitEnumerator<T> implements SplitEnumeratorCompat<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceSplitEnumerator.class);
    private final SplitEnumeratorContext<FlinkSourceSplit<T>> context;
    private final Source<T> beamSource;
    private final PipelineOptions pipelineOptions;
    private final int numSplits;
    private final Map<Integer, List<FlinkSourceSplit<T>>> pendingSplits;
    private boolean splitsInitialized = false;

    public FlinkSourceSplitEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> splitEnumeratorContext, Source<T> source, PipelineOptions pipelineOptions, int i) {
        this.context = splitEnumeratorContext;
        this.beamSource = source;
        this.pipelineOptions = pipelineOptions;
        this.numSplits = i;
        this.pendingSplits = new HashMap(i);
    }

    public void start() {
        this.context.callAsync(() -> {
            try {
                LOG.info("Starting source {}", this.beamSource);
                List<? extends Source<T>> splitBeamSource = splitBeamSource();
                HashMap hashMap = new HashMap();
                int i = 0;
                Iterator<? extends Source<T>> it = splitBeamSource.iterator();
                while (it.hasNext()) {
                    ((List) hashMap.computeIfAbsent(Integer.valueOf(i % this.context.currentParallelism()), num -> {
                        return new ArrayList();
                    })).add(new FlinkSourceSplit(i, it.next()));
                    i++;
                }
                return hashMap;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, (map, th) -> {
            if (th != null) {
                throw new RuntimeException("Failed to start source enumerator.", th);
            }
            this.pendingSplits.putAll(map);
            this.splitsInitialized = true;
            sendPendingSplitsToSourceReaders();
        });
    }

    public void handleSplitRequest(int i, @Nullable String str) {
    }

    public void addSplitsBack(List<FlinkSourceSplit<T>> list, int i) {
        LOG.info("Adding splits {} back from subtask {}", list, Integer.valueOf(i));
        this.pendingSplits.computeIfAbsent(Integer.valueOf(i), num -> {
            return new ArrayList();
        }).addAll(list);
    }

    public void addReader(int i) {
        List<FlinkSourceSplit<T>> remove = this.pendingSplits.remove(Integer.valueOf(i));
        if (remove != null) {
            assignSplitsAndLog(remove, i);
        } else if (this.splitsInitialized) {
            LOG.info("There is no split for subtask {}. Signaling no more splits.", Integer.valueOf(i));
            this.context.signalNoMoreSplits(i);
        }
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public Map<Integer, List<FlinkSourceSplit<T>>> m54snapshotState(long j) throws Exception {
        LOG.info("Taking snapshot for checkpoint {}", Long.valueOf(j));
        return snapshotState();
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.SplitEnumeratorCompat
    public Map<Integer, List<FlinkSourceSplit<T>>> snapshotState() throws Exception {
        return this.pendingSplits;
    }

    public void close() throws IOException {
    }

    private List<? extends Source<T>> splitBeamSource() throws Exception {
        if (this.beamSource instanceof BoundedSource) {
            BoundedSource boundedSource = this.beamSource;
            return boundedSource.split(boundedSource.getEstimatedSizeBytes(this.pipelineOptions) / this.numSplits, this.pipelineOptions);
        }
        if (!(this.beamSource instanceof UnboundedSource)) {
            throw new IllegalStateException("Unknown source type " + this.beamSource.getClass());
        }
        List<? extends Source<T>> split = this.beamSource.split(this.numSplits, this.pipelineOptions);
        LOG.info("Split source {} to {} splits", this.beamSource, split);
        return split;
    }

    private void sendPendingSplitsToSourceReaders() {
        Iterator<Map.Entry<Integer, List<FlinkSourceSplit<T>>>> it = this.pendingSplits.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, List<FlinkSourceSplit<T>>> next = it.next();
            int intValue = next.getKey().intValue() % this.context.currentParallelism();
            if (this.context.registeredReaders().containsKey(Integer.valueOf(intValue))) {
                assignSplitsAndLog(next.getValue(), intValue);
                it.remove();
            }
        }
    }

    private void assignSplitsAndLog(List<FlinkSourceSplit<T>> list, int i) {
        this.context.assignSplits(new SplitsAssignment(Collections.singletonMap(Integer.valueOf(i), list)));
        this.context.signalNoMoreSplits(i);
        LOG.info("Assigned splits {} to subtask {}", list, Integer.valueOf(i));
    }
}
