/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.streaming;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.flink.streaming.StreamSources;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.util.OutputTag;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class BoundedSourceRestoreTest {
    private final @UnknownKeyFor @NonNull @Initialized int numTasks;
    private final @UnknownKeyFor @NonNull @Initialized int numSplits;

    public BoundedSourceRestoreTest(@UnknownKeyFor @NonNull @Initialized int numTasks, @UnknownKeyFor @NonNull @Initialized int numSplits) {
        this.numTasks = numTasks;
        this.numSplits = numSplits;
    }

    @Parameterized.Parameters
    public static @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized Object @UnknownKeyFor @NonNull @Initialized []> data() {
        return Arrays.asList({1, 1}, {1, 2}, {1, 4});
    }

    @Test
    public void testRestore() throws @UnknownKeyFor @NonNull @Initialized Exception {
        int numElements = 102;
        int firstBatchSize = 23;
        int secondBatchSize = 79;
        HashSet emittedElements = new HashSet();
        Object checkpointLock = new Object();
        PipelineOptions options = PipelineOptionsFactory.create();
        BoundedSource source = CountingSource.upTo((long)102L);
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter unboundedSource = new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(source);
        UnboundedSourceWrapper flinkWrapper = new UnboundedSourceWrapper("stepName", options, (UnboundedSource)unboundedSource, this.numSplits);
        StreamSource sourceOperator = new StreamSource((SourceFunction)flinkWrapper);
        AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness((StreamOperator)sourceOperator, this.numTasks, this.numTasks, 0);
        testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
        boolean readFirstBatchOfElements = false;
        try {
            testHarness.open();
            StreamSources.run(sourceOperator, checkpointLock, new TestStreamStatusMaintainer(), new PartialCollector(emittedElements, 23));
        }
        catch (SuccessException e) {
            readFirstBatchOfElements = true;
        }
        Assert.assertTrue((String)"Did not successfully read first batch of elements.", (boolean)readFirstBatchOfElements);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        ArrayList<Integer> finalizeList = new ArrayList<Integer>();
        TestCountingSource.setFinalizeTracker(finalizeList);
        testHarness.notifyOfCompletedCheckpoint(0L);
        BoundedSource restoredSource = CountingSource.upTo((long)102L);
        UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter restoredUnboundedSource = new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(restoredSource);
        UnboundedSourceWrapper restoredFlinkWrapper = new UnboundedSourceWrapper("stepName", options, (UnboundedSource)restoredUnboundedSource, this.numSplits);
        StreamSource restoredSourceOperator = new StreamSource((SourceFunction)restoredFlinkWrapper);
        AbstractStreamOperatorTestHarness restoredTestHarness = new AbstractStreamOperatorTestHarness((StreamOperator)restoredSourceOperator, this.numTasks, 1, 0);
        restoredTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
        restoredTestHarness.initializeState(snapshot);
        boolean readSecondBatchOfElements = false;
        try {
            restoredTestHarness.open();
            StreamSources.run(restoredSourceOperator, checkpointLock, new TestStreamStatusMaintainer(), new PartialCollector(emittedElements, 79));
        }
        catch (SuccessException e) {
            readSecondBatchOfElements = true;
        }
        Assert.assertTrue((String)"Did not successfully read second batch of elements.", (boolean)readSecondBatchOfElements);
        Assert.assertTrue((emittedElements.size() == 102 ? 1 : 0) != 0);
    }

    private static final class TestStreamStatusMaintainer
    implements StreamStatusMaintainer {
        @UnknownKeyFor @NonNull @Initialized StreamStatus currentStreamStatus = StreamStatus.ACTIVE;

        private TestStreamStatusMaintainer() {
        }

        public void toggleStreamStatus(@UnknownKeyFor @NonNull @Initialized StreamStatus streamStatus) {
            if (!this.currentStreamStatus.equals((Object)streamStatus)) {
                this.currentStreamStatus = streamStatus;
            }
        }

        public @UnknownKeyFor @NonNull @Initialized StreamStatus getStreamStatus() {
            return this.currentStreamStatus;
        }
    }

    private static class PartialCollector<@UnknownKeyFor T>
    implements Output<StreamRecord<WindowedValue<ValueWithRecordId<T>>>> {
        private final @UnknownKeyFor @NonNull @Initialized Set<T> emittedElements;
        private final @UnknownKeyFor @NonNull @Initialized int elementsToConsumeLimit;
        private @UnknownKeyFor @NonNull @Initialized int count = 0;

        private PartialCollector(@UnknownKeyFor @NonNull @Initialized Set<T> emittedElements, @UnknownKeyFor @NonNull @Initialized int elementsToConsumeLimit) {
            this.emittedElements = emittedElements;
            this.elementsToConsumeLimit = elementsToConsumeLimit;
        }

        public void emitWatermark(@UnknownKeyFor @NonNull @Initialized Watermark watermark) {
        }

        public <X> void collect(@UnknownKeyFor @NonNull @Initialized OutputTag<X> outputTag, @UnknownKeyFor @NonNull @Initialized StreamRecord<X> streamRecord) {
            this.collect(streamRecord);
        }

        public void emitLatencyMarker(@UnknownKeyFor @NonNull @Initialized LatencyMarker latencyMarker) {
        }

        public void collect(@UnknownKeyFor @NonNull @Initialized StreamRecord<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized ValueWithRecordId<T>>> record) {
            this.emittedElements.add(((ValueWithRecordId)((WindowedValue)record.getValue()).getValue()).getValue());
            ++this.count;
            if (this.count >= this.elementsToConsumeLimit) {
                throw new SuccessException();
            }
        }

        public void close() {
        }
    }

    private static class SuccessException
    extends RuntimeException {
        private SuccessException() {
        }
    }
}

