package org.apache.beam.runners.flink.streaming;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.class */
public class UnboundedSourceWrapperTest {
    private final int numTasks;
    private final int numSplits;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest$SuccessException.class */
    public static class SuccessException extends RuntimeException {
        private SuccessException() {
        }
    }

    public UnboundedSourceWrapperTest(int i, int i2) {
        this.numTasks = i;
        this.numSplits = i2;
    }

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{1, 1}, new Object[]{1, 2}, new Object[]{1, 4}, new Object[]{2, 1}, new Object[]{2, 2}, new Object[]{2, 4}, new Object[]{4, 1}, new Object[]{4, 2}, new Object[]{4, 4});
    }

    @Test
    public void testReaders() throws Exception {
        Object obj = new Object();
        UnboundedSourceWrapper unboundedSourceWrapper = new UnboundedSourceWrapper(PipelineOptionsFactory.create(), new TestCountingSource(20), this.numSplits);
        Assert.assertEquals(this.numSplits, unboundedSourceWrapper.getSplitSources().size());
        StreamSource streamSource = new StreamSource(unboundedSourceWrapper);
        setupSourceOperator(streamSource, this.numTasks);
        try {
            streamSource.open();
            streamSource.run(obj, new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { // from class: org.apache.beam.runners.flink.streaming.UnboundedSourceWrapperTest.1
                private int count = 0;

                public void emitWatermark(Watermark watermark) {
                }

                public void collect(StreamRecord<WindowedValue<KV<Integer, Integer>>> streamRecord) {
                    this.count++;
                    if (this.count >= 20) {
                        throw new SuccessException();
                    }
                }

                public void close() {
                }
            });
            Assert.fail("Read terminated without producing expected number of outputs");
        } catch (SuccessException e) {
            Assert.assertEquals(Math.max(1, this.numSplits / this.numTasks), unboundedSourceWrapper.getLocalSplitSources().size());
        }
    }

    @Test
    public void testRestore() throws Exception {
        Object obj = new Object();
        PipelineOptions create = PipelineOptionsFactory.create();
        UnboundedSourceWrapper unboundedSourceWrapper = new UnboundedSourceWrapper(create, new TestCountingSource(20), this.numSplits);
        Assert.assertEquals(this.numSplits, unboundedSourceWrapper.getSplitSources().size());
        StreamSource streamSource = new StreamSource(unboundedSourceWrapper);
        setupSourceOperator(streamSource, this.numTasks);
        final HashSet hashSet = new HashSet();
        boolean z = false;
        try {
            streamSource.open();
            streamSource.run(obj, new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { // from class: org.apache.beam.runners.flink.streaming.UnboundedSourceWrapperTest.2
                private int count = 0;

                public void emitWatermark(Watermark watermark) {
                }

                public void collect(StreamRecord<WindowedValue<KV<Integer, Integer>>> streamRecord) {
                    hashSet.add(((WindowedValue) streamRecord.getValue()).getValue());
                    this.count++;
                    if (this.count >= 10) {
                        throw new SuccessException();
                    }
                }

                public void close() {
                }
            });
        } catch (SuccessException e) {
            z = true;
        }
        Assert.assertTrue("Did not successfully read first batch of elements.", z);
        byte[] snapshotState = unboundedSourceWrapper.snapshotState(0L, 0L);
        TestCountingSource.setFinalizeTracker(new ArrayList());
        unboundedSourceWrapper.notifyCheckpointComplete(0L);
        Assert.assertEquals(unboundedSourceWrapper.getLocalSplitSources().size(), r0.size());
        UnboundedSourceWrapper unboundedSourceWrapper2 = new UnboundedSourceWrapper(create, new TestCountingSource(20), this.numSplits);
        Assert.assertEquals(this.numSplits, unboundedSourceWrapper2.getSplitSources().size());
        StreamSource streamSource2 = new StreamSource(unboundedSourceWrapper2);
        setupSourceOperator(streamSource2, this.numTasks);
        unboundedSourceWrapper2.restoreState(snapshotState);
        boolean z2 = false;
        try {
            streamSource2.open();
            streamSource2.run(obj, new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { // from class: org.apache.beam.runners.flink.streaming.UnboundedSourceWrapperTest.3
                private int count = 0;

                public void emitWatermark(Watermark watermark) {
                }

                public void collect(StreamRecord<WindowedValue<KV<Integer, Integer>>> streamRecord) {
                    hashSet.add(((WindowedValue) streamRecord.getValue()).getValue());
                    this.count++;
                    if (this.count >= 10) {
                        throw new SuccessException();
                    }
                }

                public void close() {
                }
            });
        } catch (SuccessException e2) {
            z2 = true;
        }
        Assert.assertEquals(Math.max(1, this.numSplits / this.numTasks), unboundedSourceWrapper.getLocalSplitSources().size());
        Assert.assertTrue("Did not successfully read second batch of elements.", z2);
        Assert.assertTrue(hashSet.size() == 20);
    }

    private static <T> void setupSourceOperator(StreamSource<T, ?> streamSource, int i) {
        ExecutionConfig executionConfig = new ExecutionConfig();
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
        DummyEnvironment dummyEnvironment = new DummyEnvironment("MockTwoInputTask", i, 0);
        StreamTask streamTask = (StreamTask) Mockito.mock(StreamTask.class);
        Mockito.when(streamTask.getName()).thenReturn("Mock Task");
        Mockito.when(streamTask.getCheckpointLock()).thenReturn(new Object());
        Mockito.when(streamTask.getConfiguration()).thenReturn(streamConfig);
        Mockito.when(streamTask.getEnvironment()).thenReturn(dummyEnvironment);
        Mockito.when(streamTask.getExecutionConfig()).thenReturn(executionConfig);
        Mockito.when(streamTask.getAccumulatorMap()).thenReturn(Collections.emptyMap());
        streamSource.setup(streamTask, streamConfig, (Output) Mockito.mock(Output.class));
    }
}
