package org.apache.beam.runners.flink.streaming;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.flink.streaming.StreamSources;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.util.OutputTag;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.class */
public class BoundedSourceRestoreTest {
    private final int numTasks;
    private final int numSplits;

    /* loaded from: input_file:org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest$PartialCollector.class */
    private static class PartialCollector<T> implements StreamSources.OutputWrapper<StreamRecord<WindowedValue<ValueWithRecordId<T>>>> {
        private final Set<T> emittedElements;
        private final int elementsToConsumeLimit;
        private int count;

        private PartialCollector(Set<T> set, int i) {
            this.count = 0;
            this.emittedElements = set;
            this.elementsToConsumeLimit = i;
        }

        public void emitWatermark(Watermark watermark) {
        }

        /* JADX WARN: Multi-variable type inference failed */
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
            collect((StreamRecord) streamRecord);
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
        }

        public void collect(StreamRecord<WindowedValue<ValueWithRecordId<T>>> streamRecord) {
            this.emittedElements.add(((ValueWithRecordId) ((WindowedValue) streamRecord.getValue()).getValue()).getValue());
            this.count++;
            if (this.count >= this.elementsToConsumeLimit) {
                throw new SuccessException();
            }
        }

        public void close() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest$SuccessException.class */
    public static class SuccessException extends RuntimeException {
        private SuccessException() {
        }
    }

    public BoundedSourceRestoreTest(int i, int i2) {
        this.numTasks = i;
        this.numSplits = i2;
    }

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{1, 1}, new Object[]{1, 2}, new Object[]{1, 4});
    }

    @Test
    public void testRestore() throws Exception {
        HashSet hashSet = new HashSet();
        Object obj = new Object();
        PipelineOptions create = PipelineOptionsFactory.create();
        StreamSource streamSource = new StreamSource(new UnboundedSourceWrapper("stepName", create, new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(CountingSource.upTo(102L)), this.numSplits));
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness(streamSource, this.numTasks, this.numTasks, 0);
        abstractStreamOperatorTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
        boolean z = false;
        try {
            abstractStreamOperatorTestHarness.open();
            StreamSources.run(streamSource, obj, new PartialCollector(hashSet, 23));
        } catch (SuccessException e) {
            z = true;
        }
        Assert.assertTrue("Did not successfully read first batch of elements.", z);
        OperatorSubtaskState snapshot = abstractStreamOperatorTestHarness.snapshot(0L, 0L);
        TestCountingSource.setFinalizeTracker(new ArrayList());
        abstractStreamOperatorTestHarness.notifyOfCompletedCheckpoint(0L);
        StreamSource streamSource2 = new StreamSource(new UnboundedSourceWrapper("stepName", create, new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(CountingSource.upTo(102L)), this.numSplits));
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness2 = new AbstractStreamOperatorTestHarness(streamSource2, this.numTasks, 1, 0);
        abstractStreamOperatorTestHarness2.setTimeCharacteristic(TimeCharacteristic.EventTime);
        abstractStreamOperatorTestHarness2.initializeState(snapshot);
        boolean z2 = false;
        try {
            abstractStreamOperatorTestHarness2.open();
            StreamSources.run(streamSource2, obj, new PartialCollector(hashSet, 79));
        } catch (SuccessException e2) {
            z2 = true;
        }
        Assert.assertTrue("Did not successfully read second batch of elements.", z2);
        Assert.assertTrue(hashSet.size() == 102);
    }
}
