package org.apache.flink.test.streaming.runtime;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.runtime.tasks.StreamTaskSelectiveReadingTest;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/streaming/runtime/StreamTaskSelectiveReadingITCase.class */
public class StreamTaskSelectiveReadingITCase {

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/StreamTaskSelectiveReadingITCase$TestIntegerSource.class */
    private static class TestIntegerSource extends TestSource<Integer> {
        public TestIntegerSource(String str, Integer[] numArr) {
            super(str, numArr);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.test.streaming.runtime.StreamTaskSelectiveReadingITCase.TestSource
        public Integer outValue(Integer num, int i) {
            return Integer.valueOf(num.intValue() * (i + 1));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/StreamTaskSelectiveReadingITCase$TestSource.class */
    public static abstract class TestSource<T> extends RichParallelSourceFunction<T> {
        private static final long serialVersionUID = 1;
        protected final String name;
        private volatile boolean running = true;
        private transient RuntimeContext context;
        private final T[] elements;

        public TestSource(String str, T[] tArr) {
            this.name = str;
            this.elements = tArr;
        }

        public void open(Configuration configuration) throws Exception {
            this.context = getRuntimeContext();
        }

        public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
            int i = 0;
            while (this.running && i < this.elements.length) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(outValue(this.elements[i], this.context.getIndexOfThisSubtask()));
                    i++;
                }
            }
        }

        public void cancel() {
            this.running = false;
        }

        protected abstract T outValue(T t, int i);
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/StreamTaskSelectiveReadingITCase$TestStringSource.class */
    private static class TestStringSource extends TestSource<String> {
        public TestStringSource(String str, String[] strArr) {
            super(str, strArr);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.test.streaming.runtime.StreamTaskSelectiveReadingITCase.TestSource
        public String outValue(String str, int i) {
            return "[" + this.name + "-" + i + "]: " + str;
        }
    }

    @Test
    public void testSequentialReading() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        DataStreamSource addSource = executionEnvironment.addSource(new TestStringSource("Source0", new String[]{"Hello-1", "Hello-2", "Hello-3", "Hello-4", "Hello-5", "Hello-6"}));
        DataStreamSource parallelism = executionEnvironment.addSource(new TestIntegerSource("Source1", new Integer[]{1, 2, 3})).setParallelism(2);
        TestListResultSink testListResultSink = new TestListResultSink();
        StreamTaskSelectiveReadingTest.SequentialReadingStreamOperator sequentialReadingStreamOperator = new StreamTaskSelectiveReadingTest.SequentialReadingStreamOperator("Operator0");
        sequentialReadingStreamOperator.setChainingStrategy(ChainingStrategy.NEVER);
        addSource.connect(parallelism).transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, sequentialReadingStreamOperator).addSink(testListResultSink);
        executionEnvironment.execute("Selective reading test");
        List result = testListResultSink.getResult();
        List asList = Arrays.asList("[Operator0-1]: [Source0-0]: Hello-1", "[Operator0-1]: [Source0-0]: Hello-2", "[Operator0-1]: [Source0-0]: Hello-3", "[Operator0-1]: [Source0-0]: Hello-4", "[Operator0-1]: [Source0-0]: Hello-5", "[Operator0-1]: [Source0-0]: Hello-6");
        List asList2 = Arrays.asList("[Operator0-2]: 1", "[Operator0-2]: 2", "[Operator0-2]: 3", "[Operator0-2]: 2", "[Operator0-2]: 4", "[Operator0-2]: 6");
        Collections.sort(asList2);
        Assert.assertEquals(asList.size() + asList2.size(), result.size());
        Assert.assertEquals(asList, result.subList(0, asList.size()));
        List subList = result.subList(asList.size(), asList.size() + asList2.size());
        Collections.sort(subList);
        Assert.assertEquals(asList2, subList);
    }
}
