package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.class */
public class FlinkUnboundedSourceReader<T> extends FlinkSourceReaderBase<T, WindowedValue<ValueWithRecordId<T>>> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkUnboundedSourceReader.class);

    @VisibleForTesting
    protected static final String PENDING_BYTES_METRIC_NAME = "pendingBytes";
    private static final long SLEEP_ON_IDLE_MS = 50;
    private static final long MIN_WATERMARK_EMIT_INTERVAL_MS = 10;
    private final AtomicReference<CompletableFuture<Void>> dataAvailableFutureRef;
    private final List<FlinkSourceReaderBase<T, WindowedValue<ValueWithRecordId<T>>>.ReaderAndOutput> readers;
    private int currentReaderIndex;
    private volatile boolean shouldEmitWatermark;
    private final LinkedHashMap<Long, List<FlinkSourceSplit<T>>> pendingCheckpoints;

    public FlinkUnboundedSourceReader(String str, SourceReaderContext sourceReaderContext, PipelineOptions pipelineOptions, Function<WindowedValue<ValueWithRecordId<T>>, Long> function) {
        super(str, sourceReaderContext, pipelineOptions, function);
        this.dataAvailableFutureRef = new AtomicReference<>();
        this.readers = new ArrayList();
        this.currentReaderIndex = 0;
        this.pendingCheckpoints = new LinkedHashMap<>();
    }

    @VisibleForTesting
    protected FlinkUnboundedSourceReader(String str, SourceReaderContext sourceReaderContext, PipelineOptions pipelineOptions, ScheduledExecutorService scheduledExecutorService, Function<WindowedValue<ValueWithRecordId<T>>, Long> function) {
        super(str, scheduledExecutorService, sourceReaderContext, pipelineOptions, function);
        this.dataAvailableFutureRef = new AtomicReference<>();
        this.readers = new ArrayList();
        this.currentReaderIndex = 0;
        this.pendingCheckpoints = new LinkedHashMap<>();
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase
    protected void addSplitsToUnfinishedForCheckpoint(long j, List<FlinkSourceSplit<T>> list) {
        this.pendingCheckpoints.put(Long.valueOf(j), list);
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        super.notifyCheckpointComplete(j);
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Long, List<FlinkSourceSplit<T>>> entry : this.pendingCheckpoints.entrySet()) {
            if (entry.getKey().longValue() <= j) {
                Iterator<FlinkSourceSplit<T>> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    finalizeSourceSplit(it.next().getCheckpointMark());
                }
                arrayList.add(entry.getKey());
            }
        }
        LinkedHashMap<Long, List<FlinkSourceSplit<T>>> linkedHashMap = this.pendingCheckpoints;
        Objects.requireNonNull(linkedHashMap);
        arrayList.forEach((v1) -> {
            r1.remove(v1);
        });
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase
    public void start() {
        createPendingBytesGauge(this.context);
        Long autoWatermarkInterval = ((FlinkPipelineOptions) this.pipelineOptions.as(FlinkPipelineOptions.class)).getAutoWatermarkInterval();
        if (autoWatermarkInterval == null) {
            Long valueOf = Long.valueOf(((FlinkPipelineOptions) this.pipelineOptions.as(FlinkPipelineOptions.class)).getMaxBundleTimeMills().longValue() / 5);
            autoWatermarkInterval = Long.valueOf(valueOf.longValue() > MIN_WATERMARK_EMIT_INTERVAL_MS ? valueOf.longValue() : MIN_WATERMARK_EMIT_INTERVAL_MS);
            LOG.warn("AutoWatermarkInterval is not set, watermarks will be emitted at a default interval of {} ms", autoWatermarkInterval);
        }
        scheduleTaskAtFixedRate(() -> {
            this.shouldEmitWatermark = true;
            CompletableFuture<Void> completableFuture = this.dataAvailableFutureRef.get();
            if (completableFuture != null) {
                completableFuture.complete(null);
            }
        }, autoWatermarkInterval.longValue(), autoWatermarkInterval.longValue());
    }

    public InputStatus pollNext(ReaderOutput<WindowedValue<ValueWithRecordId<T>>> readerOutput) throws Exception {
        checkExceptionAndMaybeThrow();
        maybeEmitWatermark();
        maybeCreateReaderForNewSplits();
        FlinkSourceReaderBase<T, WindowedValue<ValueWithRecordId<T>>>.ReaderAndOutput nextReaderWithData = nextReaderWithData();
        if (nextReaderWithData != null) {
            emitRecord(nextReaderWithData, readerOutput);
            return InputStatus.MORE_AVAILABLE;
        }
        if (noMoreSplits() && isEndOfAllReaders()) {
            LOG.info("No more splits and no reader available. Terminating consumption.");
            return InputStatus.END_OF_INPUT;
        }
        LOG.trace("No data available for now.");
        return InputStatus.NOTHING_AVAILABLE;
    }

    private boolean isEndOfAllReaders() {
        return allReaders().values().stream().allMatch(readerAndOutput -> {
            return asUnbounded(readerAndOutput.reader).getWatermark().getMillis() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
        });
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase
    protected CompletableFuture<Void> isAvailableForAliveReaders() {
        CompletableFuture<Void> completableFuture = this.dataAvailableFutureRef.get();
        if (completableFuture != null) {
            if (!completableFuture.isDone()) {
                return completableFuture;
            }
            this.dataAvailableFutureRef.compareAndSet(completableFuture, null);
            return completableFuture;
        }
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        this.dataAvailableFutureRef.set(completableFuture2);
        if (this.shouldEmitWatermark || hasException()) {
            this.dataAvailableFutureRef.set(null);
            completableFuture2.complete(null);
        } else {
            LOG.debug("There is no data available, scheduling the idle reader checker.");
            scheduleTask(() -> {
                CompletableFuture<Void> completableFuture3 = this.dataAvailableFutureRef.get();
                if (completableFuture3 != null) {
                    completableFuture3.complete(null);
                }
            }, SLEEP_ON_IDLE_MS);
        }
        return completableFuture2;
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase
    protected FlinkSourceSplit<T> getReaderCheckpoint(int i, FlinkSourceReaderBase<T, WindowedValue<ValueWithRecordId<T>>>.ReaderAndOutput readerAndOutput) {
        UnboundedSource.UnboundedReader unboundedReader = readerAndOutput.reader;
        UnboundedSource.CheckpointMark checkpointMark = unboundedReader.getCheckpointMark();
        return new FlinkSourceSplit<>(i, readerAndOutput.reader.getCurrentSource(), encodeCheckpointMark(unboundedReader.getCurrentSource().getCheckpointMarkCoder(), checkpointMark), checkpointMark);
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase
    protected Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> flinkSourceSplit) throws IOException {
        return createUnboundedSourceReader(flinkSourceSplit.getBeamSplitSource(), flinkSourceSplit.getSplitState());
    }

    private void emitRecord(FlinkSourceReaderBase<T, WindowedValue<ValueWithRecordId<T>>>.ReaderAndOutput readerAndOutput, ReaderOutput<WindowedValue<ValueWithRecordId<T>>> readerOutput) {
        UnboundedSource.UnboundedReader asUnbounded = asUnbounded(readerAndOutput.reader);
        Object current = asUnbounded.getCurrent();
        byte[] currentRecordId = asUnbounded.getCurrentRecordId();
        WindowedValue of = WindowedValue.of(new ValueWithRecordId(current, currentRecordId), asUnbounded.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
        LOG.trace("Emitting record: {}", of);
        if (this.timestampExtractor == null) {
            readerAndOutput.getAndMaybeCreateSplitOutput(readerOutput).collect(of);
        } else {
            readerAndOutput.getAndMaybeCreateSplitOutput(readerOutput).collect(of, this.timestampExtractor.apply(of).longValue());
        }
        this.numRecordsInCounter.inc();
    }

    private void maybeEmitWatermark() {
        if (this.shouldEmitWatermark) {
            allReaders().values().forEach(readerAndOutput -> {
                SourceOutput sourceOutput = readerAndOutput.sourceOutput();
                if (sourceOutput != null) {
                    sourceOutput.emitWatermark(new Watermark(asUnbounded(readerAndOutput.reader).getWatermark().getMillis()));
                }
            });
            this.shouldEmitWatermark = false;
        }
    }

    private void maybeCreateReaderForNewSplits() throws Exception {
        while (!sourceSplits().isEmpty()) {
            Optional<FlinkSourceReaderBase<T, WindowedValue<ValueWithRecordId<T>>>.ReaderAndOutput> createAndTrackNextReader = createAndTrackNextReader();
            if (createAndTrackNextReader.isPresent()) {
                this.readers.add(createAndTrackNextReader.get());
            } else {
                checkExceptionAndMaybeThrow();
            }
        }
    }

    private FlinkSourceReaderBase<T, WindowedValue<ValueWithRecordId<T>>>.ReaderAndOutput nextReaderWithData() throws IOException {
        int size = this.readers.size();
        for (int i = 0; i < size; i++) {
            FlinkSourceReaderBase<T, WindowedValue<ValueWithRecordId<T>>>.ReaderAndOutput readerAndOutput = this.readers.get(this.currentReaderIndex);
            this.currentReaderIndex = (this.currentReaderIndex + 1) % size;
            if (readerAndOutput.startOrAdvance()) {
                return readerAndOutput;
            }
        }
        return null;
    }

    private static <T> UnboundedSource.UnboundedReader<T> asUnbounded(Source.Reader<T> reader) {
        return (UnboundedSource.UnboundedReader) reader;
    }

    private void createPendingBytesGauge(SourceReaderContext sourceReaderContext) {
        sourceReaderContext.metricGroup().gauge(PENDING_BYTES_METRIC_NAME, () -> {
            long j = -1;
            Iterator<FlinkSourceReaderBase<T, WindowedValue<ValueWithRecordId<T>>>.ReaderAndOutput> it = allReaders().values().iterator();
            while (it.hasNext()) {
                long splitBacklogBytes = asUnbounded(it.next().reader).getSplitBacklogBytes();
                if (splitBacklogBytes != -1) {
                    j = j == -1 ? splitBacklogBytes : j + splitBacklogBytes;
                }
            }
            return Long.valueOf(j);
        });
    }

    private <CheckpointMarkT extends UnboundedSource.CheckpointMark> byte[] encodeCheckpointMark(Coder<CheckpointMarkT> coder, CheckpointMarkT checkpointmarkt) {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            Throwable th = null;
            try {
                try {
                    coder.encode(checkpointmarkt, byteArrayOutputStream);
                    byte[] byteArray = byteArrayOutputStream.toByteArray();
                    $closeResource(null, byteArrayOutputStream);
                    return byteArray;
                } finally {
                }
            } catch (Throwable th2) {
                $closeResource(th, byteArrayOutputStream);
                throw th2;
            }
        } catch (IOException e) {
            throw new RuntimeException("Failed to encode checkpoint mark.", e);
        }
    }

    private <CheckpointMarkT extends UnboundedSource.CheckpointMark> Source.Reader<T> createUnboundedSourceReader(Source<T> source, byte[] bArr) throws IOException {
        UnboundedSource unboundedSource = (UnboundedSource) source;
        Coder checkpointMarkCoder = unboundedSource.getCheckpointMarkCoder();
        if (bArr == null) {
            return unboundedSource.createReader(this.pipelineOptions, (UnboundedSource.CheckpointMark) null);
        }
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
        Throwable th = null;
        try {
            try {
                UnboundedSource.UnboundedReader createReader = unboundedSource.createReader(this.pipelineOptions, (UnboundedSource.CheckpointMark) checkpointMarkCoder.decode(byteArrayInputStream));
                $closeResource(null, byteArrayInputStream);
                return createReader;
            } finally {
            }
        } catch (Throwable th2) {
            $closeResource(th, byteArrayInputStream);
            throw th2;
        }
    }

    private void finalizeSourceSplit(UnboundedSource.CheckpointMark checkpointMark) throws IOException {
        if (checkpointMark != null) {
            checkpointMark.finalizeCheckpoint();
        }
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
