/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.utils.Workarounds;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BeamStoppableFunction;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnboundedSourceWrapper<@UnknownKeyFor OutputT, @UnknownKeyFor CheckpointMarkT extends // Could not load outer class - annotation placement on inner may be incorrect
@UnknownKeyFor @NonNull @Initialized UnboundedSource.CheckpointMark>
extends RichParallelSourceFunction<WindowedValue<ValueWithRecordId<OutputT>>>
implements ProcessingTimeCallback,
BeamStoppableFunction,
CheckpointListener,
CheckpointedFunction {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class);
    private final @UnknownKeyFor @NonNull @Initialized String stepName;
    private final @UnknownKeyFor @NonNull @Initialized SerializablePipelineOptions serializedOptions;
    private final @UnknownKeyFor @NonNull @Initialized boolean isConvertedBoundedSource;
    private final @UnknownKeyFor @NonNull @Initialized KvCoder<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> checkpointCoder;
    private final @UnknownKeyFor @NonNull @Initialized List<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized UnboundedSource<OutputT, CheckpointMarkT>> splitSources;
    private final @UnknownKeyFor @NonNull @Initialized long idleTimeoutMs;
    private transient @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized UnboundedSource<OutputT, CheckpointMarkT>> localSplitSources;
    private transient @UnknownKeyFor @NonNull @Initialized List<// Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized UnboundedSource.UnboundedReader<OutputT>> localReaders;
    private volatile @UnknownKeyFor @NonNull @Initialized boolean isRunning = true;
    private transient @UnknownKeyFor @NonNull @Initialized StreamingRuntimeContext runtimeContext;
    private transient // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized SourceFunction.SourceContext<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized ValueWithRecordId<OutputT>>> context;
    private transient @UnknownKeyFor @NonNull @Initialized LinkedHashMap<@UnknownKeyFor @NonNull @Initialized Long, @UnknownKeyFor @NonNull @Initialized List<CheckpointMarkT>> pendingCheckpoints;
    private static final @UnknownKeyFor @NonNull @Initialized int MAX_NUMBER_PENDING_CHECKPOINTS = 32;
    private transient @UnknownKeyFor @NonNull @Initialized ListState<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> stateForCheckpoint;
    private transient @UnknownKeyFor @NonNull @Initialized boolean isRestored = false;
    private transient @UnknownKeyFor @NonNull @Initialized boolean maxWatermarkReached;
    private transient @UnknownKeyFor @NonNull @Initialized FlinkMetricContainer metricContainer;

    public UnboundedSourceWrapper(@UnknownKeyFor @NonNull @Initialized String stepName, @UnknownKeyFor @NonNull @Initialized PipelineOptions pipelineOptions, @UnknownKeyFor @NonNull @Initialized UnboundedSource<OutputT, CheckpointMarkT> source, @UnknownKeyFor @NonNull @Initialized int parallelism) throws @UnknownKeyFor @NonNull @Initialized Exception {
        Coder checkpointMarkCoder;
        this.stepName = stepName;
        this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
        this.isConvertedBoundedSource = source instanceof UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
        if (source.requiresDeduping()) {
            LOG.warn("Source {} requires deduping but Flink runner doesn't support this yet.", source);
        }
        if ((checkpointMarkCoder = source.getCheckpointMarkCoder()) == null) {
            LOG.info("No CheckpointMarkCoder specified for this source. Won't create snapshots.");
            this.checkpointCoder = null;
        } else {
            SerializableCoder sourceCoder = SerializableCoder.of((TypeDescriptor)new TypeDescriptor<UnboundedSource>(){});
            this.checkpointCoder = KvCoder.of((Coder)sourceCoder, (Coder)checkpointMarkCoder);
        }
        this.splitSources = source.split(parallelism, pipelineOptions);
        FlinkPipelineOptions options = (FlinkPipelineOptions)pipelineOptions.as(FlinkPipelineOptions.class);
        this.idleTimeoutMs = options.getShutdownSourcesAfterIdleMs();
    }

    public void open(@UnknownKeyFor @NonNull @Initialized Configuration parameters) throws @UnknownKeyFor @NonNull @Initialized Exception {
        FileSystems.setDefaultPipelineOptions((PipelineOptions)this.serializedOptions.get());
        this.runtimeContext = (StreamingRuntimeContext)this.getRuntimeContext();
        this.metricContainer = new FlinkMetricContainer((RuntimeContext)this.runtimeContext);
        int subtaskIndex = this.runtimeContext.getIndexOfThisSubtask();
        int numSubtasks = this.runtimeContext.getNumberOfParallelSubtasks();
        this.localSplitSources = new ArrayList<UnboundedSource<OutputT, CheckpointMarkT>>();
        this.localReaders = new ArrayList<UnboundedSource.UnboundedReader<OutputT>>();
        this.pendingCheckpoints = new LinkedHashMap();
        if (this.isRestored) {
            for (KV restored : (Iterable)this.stateForCheckpoint.get()) {
                this.localSplitSources.add((UnboundedSource)restored.getKey());
                this.localReaders.add(((UnboundedSource)restored.getKey()).createReader(this.serializedOptions.get(), (UnboundedSource.CheckpointMark)restored.getValue()));
            }
        } else {
            for (int i = 0; i < this.splitSources.size(); ++i) {
                if (i % numSubtasks != subtaskIndex) continue;
                UnboundedSource<OutputT, CheckpointMarkT> source = this.splitSources.get(i);
                UnboundedSource.UnboundedReader reader = source.createReader(this.serializedOptions.get(), null);
                this.localSplitSources.add(source);
                this.localReaders.add(reader);
            }
        }
        LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}", new Object[]{subtaskIndex + 1, numSubtasks, this.localSplitSources});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized SourceFunction.SourceContext<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized ValueWithRecordId<OutputT>>> ctx) throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.context = ctx;
        ReaderInvocationUtil readerInvoker = new ReaderInvocationUtil(this.stepName, this.serializedOptions.get(), this.metricContainer);
        this.setNextWatermarkTimer(this.runtimeContext);
        if (this.localReaders.isEmpty()) {
            LOG.info("Number of readers is 0 for this task executor, idle");
        } else if (this.isConvertedBoundedSource) {
            for (int i = 0; i < this.localReaders.size() && this.isRunning; ++i) {
                boolean dataAvailable;
                UnboundedSource.UnboundedReader<OutputT> reader = this.localReaders.get(i);
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    boolean dataAvailable2 = readerInvoker.invokeStart(reader);
                    if (dataAvailable2) {
                        this.emitElement(ctx, reader);
                    }
                }
                do {
                    Object dataAvailable2 = ctx.getCheckpointLock();
                    synchronized (dataAvailable2) {
                        dataAvailable = readerInvoker.invokeAdvance(reader);
                        if (dataAvailable) {
                            this.emitElement(ctx, reader);
                        }
                    }
                } while (dataAvailable && this.isRunning);
            }
        } else {
            Object object;
            int numReaders = this.localReaders.size();
            int currentReader = 0;
            for (UnboundedSource.UnboundedReader<OutputT> reader : this.localReaders) {
                object = ctx.getCheckpointLock();
                synchronized (object) {
                    boolean dataAvailable = readerInvoker.invokeStart(reader);
                    if (dataAvailable) {
                        this.emitElement(ctx, reader);
                    }
                }
            }
            boolean hadData = false;
            while (this.isRunning && !this.maxWatermarkReached) {
                UnboundedSource.UnboundedReader<OutputT> reader;
                reader = this.localReaders.get(currentReader);
                object = ctx.getCheckpointLock();
                synchronized (object) {
                    if (readerInvoker.invokeAdvance(reader)) {
                        this.emitElement(ctx, reader);
                        hadData = true;
                    }
                }
                currentReader = (currentReader + 1) % numReaders;
                if (currentReader == 0 && !hadData) {
                    Thread.sleep(50L);
                    continue;
                }
                if (currentReader != 0) continue;
                hadData = false;
            }
        }
        ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
        this.finalizeSource();
    }

    private void finalizeSource() {
        long idleStart = System.currentTimeMillis();
        while (this.isRunning && System.currentTimeMillis() - idleStart < this.idleTimeoutMs) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                if (this.isRunning) continue;
                Thread.currentThread().interrupt();
            }
        }
    }

    private void emitElement(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized SourceFunction.SourceContext<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized ValueWithRecordId<OutputT>>> ctx, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized UnboundedSource.UnboundedReader<OutputT> reader) {
        Object item = reader.getCurrent();
        byte[] recordId = reader.getCurrentRecordId();
        Instant timestamp = reader.getCurrentTimestamp();
        WindowedValue windowedValue = WindowedValue.of((Object)new ValueWithRecordId(item, recordId), (Instant)timestamp, (BoundedWindow)GlobalWindow.INSTANCE, (PaneInfo)PaneInfo.NO_FIRING);
        ctx.collect((Object)windowedValue);
    }

    public void close() throws @UnknownKeyFor @NonNull @Initialized Exception {
        try {
            if (this.metricContainer != null) {
                this.metricContainer.registerMetricsForPipelineResult();
            }
            super.close();
            if (this.localReaders != null) {
                for (UnboundedSource.UnboundedReader<OutputT> reader : this.localReaders) {
                    reader.close();
                }
            }
        }
        finally {
            Workarounds.deleteStaticCaches();
        }
    }

    public void cancel() {
        this.isRunning = false;
    }

    @Override
    public void stop() {
        this.isRunning = false;
    }

    public void snapshotState(@UnknownKeyFor @NonNull @Initialized FunctionSnapshotContext functionSnapshotContext) throws @UnknownKeyFor @NonNull @Initialized Exception {
        if (!this.isRunning) {
            LOG.debug("snapshotState() called on closed source");
        } else {
            if (this.checkpointCoder == null) {
                return;
            }
            this.stateForCheckpoint.clear();
            long checkpointId = functionSnapshotContext.getCheckpointId();
            ArrayList<UnboundedSource.CheckpointMark> checkpointMarks = new ArrayList<UnboundedSource.CheckpointMark>(this.localSplitSources.size());
            for (int i = 0; i < this.localSplitSources.size(); ++i) {
                UnboundedSource<OutputT, CheckpointMarkT> source = this.localSplitSources.get(i);
                UnboundedSource.UnboundedReader<OutputT> reader = this.localReaders.get(i);
                UnboundedSource.CheckpointMark mark = reader.getCheckpointMark();
                checkpointMarks.add(mark);
                KV kv = KV.of(source, (Object)mark);
                this.stateForCheckpoint.add((Object)kv);
            }
            int diff = this.pendingCheckpoints.size() - 32;
            if (diff >= 0) {
                Iterator<Long> iterator = this.pendingCheckpoints.keySet().iterator();
                while (diff >= 0) {
                    iterator.next();
                    iterator.remove();
                    --diff;
                }
            }
            this.pendingCheckpoints.put(checkpointId, checkpointMarks);
        }
    }

    public void initializeState(@UnknownKeyFor @NonNull @Initialized FunctionInitializationContext context) throws @UnknownKeyFor @NonNull @Initialized Exception {
        if (this.checkpointCoder == null) {
            return;
        }
        OperatorStateStore stateStore = context.getOperatorStateStore();
        CoderTypeInformation typeInformation = new CoderTypeInformation(this.checkpointCoder, this.serializedOptions.get());
        this.stateForCheckpoint = stateStore.getListState(new ListStateDescriptor("_default_", typeInformation.createSerializer(new ExecutionConfig())));
        if (context.isRestored()) {
            this.isRestored = true;
            LOG.info("Restoring state in the UnboundedSourceWrapper.");
        } else {
            LOG.info("No restore state for UnboundedSourceWrapper.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onProcessingTime(@UnknownKeyFor @NonNull @Initialized long timestamp) {
        if (this.isRunning) {
            Object object = this.context.getCheckpointLock();
            synchronized (object) {
                long watermarkMillis = Long.MAX_VALUE;
                for (UnboundedSource.UnboundedReader<OutputT> reader : this.localReaders) {
                    Instant watermark = reader.getWatermark();
                    if (watermark == null) continue;
                    watermarkMillis = Math.min(watermark.getMillis(), watermarkMillis);
                }
                this.context.emitWatermark(new Watermark(watermarkMillis));
                if (watermarkMillis < BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
                    this.setNextWatermarkTimer(this.runtimeContext);
                } else {
                    this.maxWatermarkReached = true;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setNextWatermarkTimer(@UnknownKeyFor @NonNull @Initialized StreamingRuntimeContext runtime) {
        if (this.isRunning) {
            long watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval();
            Object object = this.context.getCheckpointLock();
            synchronized (object) {
                long currentProcessingTime = runtime.getProcessingTimeService().getCurrentProcessingTime();
                if (currentProcessingTime < Long.MAX_VALUE) {
                    long nextTriggerTime = currentProcessingTime + watermarkInterval;
                    if (nextTriggerTime < currentProcessingTime) {
                        nextTriggerTime = Long.MAX_VALUE;
                    }
                    runtime.getProcessingTimeService().registerTimer(nextTriggerTime, (ProcessingTimeCallback)this);
                }
            }
        }
    }

    @VisibleForTesting
    public @UnknownKeyFor @NonNull @Initialized List<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized UnboundedSource<OutputT, CheckpointMarkT>> getSplitSources() {
        return this.splitSources;
    }

    @VisibleForTesting
    @UnknownKeyFor @NonNull @Initialized List<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized UnboundedSource<OutputT, CheckpointMarkT>> getLocalSplitSources() {
        return this.localSplitSources;
    }

    @VisibleForTesting
    @UnknownKeyFor @NonNull @Initialized List<// Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized UnboundedSource.UnboundedReader<OutputT>> getLocalReaders() {
        return this.localReaders;
    }

    @VisibleForTesting
    @UnknownKeyFor @NonNull @Initialized boolean isRunning() {
        return this.isRunning;
    }

    @VisibleForTesting
    public void setSourceContext(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized SourceFunction.SourceContext<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized ValueWithRecordId<OutputT>>> ctx) {
        this.context = ctx;
    }

    public void notifyCheckpointComplete(@UnknownKeyFor @NonNull @Initialized long checkpointId) throws @UnknownKeyFor @NonNull @Initialized Exception {
        List<CheckpointMarkT> checkpointMarks = this.pendingCheckpoints.get(checkpointId);
        if (checkpointMarks != null) {
            long currentId;
            Iterator<Long> iterator = this.pendingCheckpoints.keySet().iterator();
            do {
                currentId = iterator.next();
                iterator.remove();
            } while (currentId != checkpointId);
            for (UnboundedSource.CheckpointMark mark : checkpointMarks) {
                mark.finalizeCheckpoint();
            }
        }
    }
}

