package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainerWithoutAccumulator;
import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.metrics.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.class */
public abstract class FlinkSourceReaderBase<T, OutputT> implements SourceReader<OutputT, FlinkSourceSplit<T>> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceReaderBase.class);
    protected static final CompletableFuture<Void> AVAILABLE_NOW = CompletableFuture.completedFuture(null);
    protected static final CompletableFuture<Void> DUMMY_FUTURE = new CompletableFuture<>();
    protected static final Exception NO_EXCEPTION = new Exception();
    protected final PipelineOptions pipelineOptions;

    @Nullable
    protected final Function<OutputT, Long> timestampExtractor;
    private final Queue<FlinkSourceSplit<T>> sourceSplits;
    private final ConcurrentMap<Integer, FlinkSourceReaderBase<T, OutputT>.ReaderAndOutput> beamSourceReaders;
    protected final SourceReaderContext context;
    private final ScheduledExecutorService executor;
    protected final ReaderInvocationUtil<T, Source.Reader<T>> invocationUtil;
    protected final Counter numRecordsInCounter;
    protected final long idleTimeoutMs;
    private final CompletableFuture<Void> idleTimeoutFuture;
    private final AtomicReference<Throwable> exception;
    private boolean idleTimeoutCountingDown;
    private CompletableFuture<Void> waitingForSplitChangeFuture;
    private boolean noMoreSplits;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase$ErrorRecordingRunnable.class */
    public final class ErrorRecordingRunnable implements Runnable {
        private final Runnable runnable;

        ErrorRecordingRunnable(Runnable runnable) {
            this.runnable = runnable;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.runnable.run();
            } catch (Throwable th) {
                FlinkSourceReaderBase.this.recordException(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase$ReaderAndOutput.class */
    public final class ReaderAndOutput {
        public final String splitId;
        public final Source.Reader<T> reader;
        private boolean started;

        @Nullable
        private SourceOutput<OutputT> outputForSplit = null;

        public ReaderAndOutput(String str, Source.Reader<T> reader, boolean z) {
            this.splitId = str;
            this.reader = reader;
            this.started = z;
        }

        public SourceOutput<OutputT> getAndMaybeCreateSplitOutput(ReaderOutput<OutputT> readerOutput) {
            if (this.outputForSplit == null) {
                this.outputForSplit = readerOutput.createOutputForSplit(this.splitId);
            }
            return this.outputForSplit;
        }

        public boolean startOrAdvance() throws IOException {
            if (this.started) {
                return FlinkSourceReaderBase.this.invocationUtil.invokeAdvance(this.reader);
            }
            this.started = true;
            return FlinkSourceReaderBase.this.invocationUtil.invokeStart(this.reader);
        }

        @Nullable
        public SourceOutput<OutputT> sourceOutput() {
            return this.outputForSplit;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkSourceReaderBase(String str, SourceReaderContext sourceReaderContext, PipelineOptions pipelineOptions, @Nullable Function<OutputT, Long> function) {
        this(str, Executors.newSingleThreadScheduledExecutor(runnable -> {
            return new Thread(runnable, "FlinkSource-Executor-Thread-" + sourceReaderContext.getIndexOfSubtask());
        }), sourceReaderContext, pipelineOptions, function);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkSourceReaderBase(String str, ScheduledExecutorService scheduledExecutorService, SourceReaderContext sourceReaderContext, PipelineOptions pipelineOptions, @Nullable Function<OutputT, Long> function) {
        this.sourceSplits = new ArrayDeque();
        this.context = sourceReaderContext;
        this.pipelineOptions = pipelineOptions;
        this.timestampExtractor = function;
        this.beamSourceReaders = new ConcurrentHashMap();
        this.exception = new AtomicReference<>(NO_EXCEPTION);
        this.executor = scheduledExecutorService;
        this.idleTimeoutMs = ((FlinkPipelineOptions) pipelineOptions.as(FlinkPipelineOptions.class)).getShutdownSourcesAfterIdleMs().longValue();
        this.idleTimeoutFuture = new CompletableFuture<>();
        this.waitingForSplitChangeFuture = new CompletableFuture<>();
        this.idleTimeoutCountingDown = false;
        this.numRecordsInCounter = FlinkSourceCompat.getNumRecordsInCounter(sourceReaderContext);
        this.invocationUtil = new ReaderInvocationUtil<>(str, pipelineOptions, new FlinkMetricContainerWithoutAccumulator(sourceReaderContext.metricGroup()));
    }

    public void start() {
    }

    public List<FlinkSourceSplit<T>> snapshotState(long j) {
        checkExceptionAndMaybeThrow();
        ArrayList arrayList = new ArrayList(this.sourceSplits);
        this.beamSourceReaders.forEach((num, readerAndOutput) -> {
            try {
                arrayList.add(getReaderCheckpoint(num.intValue(), readerAndOutput));
            } catch (IOException e) {
                throw new IllegalStateException(String.format("Failed to get checkpoint for split %d", num), e);
            }
        });
        return arrayList;
    }

    public CompletableFuture<Void> isAvailable() {
        checkExceptionAndMaybeThrow();
        if (!this.sourceSplits.isEmpty() || !this.beamSourceReaders.isEmpty()) {
            CompletableFuture<Void> isAvailableForAliveReaders = isAvailableForAliveReaders();
            if (this.waitingForSplitChangeFuture.isDone()) {
                this.waitingForSplitChangeFuture = new CompletableFuture<>();
            }
            return CompletableFuture.anyOf(isAvailableForAliveReaders, this.waitingForSplitChangeFuture).thenAccept(obj -> {
            });
        }
        if (this.noMoreSplits) {
            LOG.debug("All splits have been read, waiting for shutdown timeout {}", Long.valueOf(this.idleTimeoutMs));
            checkIdleTimeoutAndMaybeStartCountdown();
            return this.idleTimeoutFuture;
        }
        if (this.waitingForSplitChangeFuture.isDone()) {
            this.waitingForSplitChangeFuture = new CompletableFuture<>();
        }
        return this.waitingForSplitChangeFuture;
    }

    public void notifyNoMoreSplits() {
        checkExceptionAndMaybeThrow();
        LOG.info("Received NoMoreSplits signal from enumerator.");
        this.noMoreSplits = true;
        this.waitingForSplitChangeFuture.complete(null);
    }

    public void addSplits(List<FlinkSourceSplit<T>> list) {
        checkExceptionAndMaybeThrow();
        LOG.info("Adding splits {}", list);
        this.sourceSplits.addAll(list);
        this.waitingForSplitChangeFuture.complete(null);
    }

    public void close() throws Exception {
        Iterator<FlinkSourceReaderBase<T, OutputT>.ReaderAndOutput> it = this.beamSourceReaders.values().iterator();
        while (it.hasNext()) {
            it.next().reader.close();
        }
        this.executor.shutdown();
    }

    protected abstract CompletableFuture<Void> isAvailableForAliveReaders();

    protected abstract FlinkSourceSplit<T> getReaderCheckpoint(int i, FlinkSourceReaderBase<T, OutputT>.ReaderAndOutput readerAndOutput) throws IOException;

    protected abstract Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> flinkSourceSplit) throws IOException;

    /* JADX INFO: Access modifiers changed from: protected */
    public final Optional<FlinkSourceReaderBase<T, OutputT>.ReaderAndOutput> createAndTrackNextReader() throws IOException {
        FlinkSourceSplit<T> poll = this.sourceSplits.poll();
        if (poll == null) {
            return Optional.empty();
        }
        FlinkSourceReaderBase<T, OutputT>.ReaderAndOutput readerAndOutput = new ReaderAndOutput(poll.splitId(), createReader(poll), false);
        this.beamSourceReaders.put(Integer.valueOf(poll.splitIndex()), readerAndOutput);
        return Optional.of(readerAndOutput);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void finishSplit(int i) throws IOException {
        FlinkSourceReaderBase<T, OutputT>.ReaderAndOutput remove = this.beamSourceReaders.remove(Integer.valueOf(i));
        if (remove == null) {
            throw new IllegalStateException("SourceReader for split " + i + " should never be null!");
        }
        LOG.info("Finished reading from split {}", remove.splitId);
        remove.reader.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean checkIdleTimeoutAndMaybeStartCountdown() {
        if (this.idleTimeoutMs <= 0) {
            this.idleTimeoutFuture.complete(null);
        } else if (!this.idleTimeoutCountingDown) {
            scheduleTask(() -> {
                this.idleTimeoutFuture.complete(null);
            }, this.idleTimeoutMs);
            this.idleTimeoutCountingDown = true;
        }
        return this.idleTimeoutFuture.isDone();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean noMoreSplits() {
        return this.noMoreSplits;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void scheduleTask(Runnable runnable, long j) {
        ignoreReturnValue(this.executor.schedule(new ErrorRecordingRunnable(runnable), j, TimeUnit.MILLISECONDS));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void scheduleTaskAtFixedRate(Runnable runnable, long j, long j2) {
        ignoreReturnValue(this.executor.scheduleAtFixedRate(new ErrorRecordingRunnable(runnable), j, j2, TimeUnit.MILLISECONDS));
    }

    protected void execute(Runnable runnable) {
        this.executor.execute(new ErrorRecordingRunnable(runnable));
    }

    protected void recordException(Throwable th) {
        if (this.exception.compareAndSet(NO_EXCEPTION, th)) {
            return;
        }
        this.exception.get().addSuppressed(th);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkExceptionAndMaybeThrow() {
        if (this.exception.get() != NO_EXCEPTION) {
            throw new RuntimeException("The source reader received exception.", this.exception.get());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean hasException() {
        return this.exception.get() != NO_EXCEPTION;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Collection<FlinkSourceSplit<T>> sourceSplits() {
        return Collections.unmodifiableCollection(this.sourceSplits);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<Integer, FlinkSourceReaderBase<T, OutputT>.ReaderAndOutput> allReaders() {
        return Collections.unmodifiableMap(this.beamSourceReaders);
    }

    protected static void ignoreReturnValue(Object obj) {
    }
}
