package org.apache.flink.test.streaming.runtime;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/test/streaming/runtime/SourceNAryInputChainingITCase.class */
public class SourceNAryInputChainingITCase extends TestLogger {

    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
    private static final int PARALLELISM = 4;

    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(PARALLELISM).build());

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/SourceNAryInputChainingITCase$NAryUnionOp.class */
    private static final class NAryUnionOp<T> extends AbstractStreamOperatorV2<T> implements MultipleInputStreamOperator<T> {
        private final int numInputs;

        public NAryUnionOp(StreamOperatorParameters<T> streamOperatorParameters, int i) {
            super(streamOperatorParameters, i);
            this.numInputs = i;
        }

        public List<Input> getInputs() {
            ArrayList arrayList = new ArrayList();
            for (int i = 1; i <= this.numInputs; i++) {
                arrayList.add(new PassThoughInput(this, i));
            }
            return arrayList;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/SourceNAryInputChainingITCase$NAryUnionOpFactory.class */
    public static final class NAryUnionOpFactory extends AbstractStreamOperatorFactory<Long> {
        private final int numInputs;

        NAryUnionOpFactory(int i) {
            this.numInputs = i;
        }

        public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> streamOperatorParameters) {
            return new NAryUnionOp(streamOperatorParameters, this.numInputs);
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return NAryUnionOp.class;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/SourceNAryInputChainingITCase$PassThoughInput.class */
    private static final class PassThoughInput<T> extends AbstractInput<T, T> {
        PassThoughInput(AbstractStreamOperatorV2<T> abstractStreamOperatorV2, int i) {
            super(abstractStreamOperatorV2, i);
        }

        public void processElement(StreamRecord<T> streamRecord) throws Exception {
            this.output.collect(streamRecord);
        }
    }

    @Test
    public void testDirectSourcesOnlyChainCreation() throws Exception {
        Assert.assertEquals(1L, sinkAndCompileJobGraph(createProgramWithSourcesOnly()).getNumberOfVertices());
    }

    @Test
    public void testDirectSourcesOnlyExecution() throws Exception {
        verifySequence(DataStreamUtils.collectBoundedStream(createProgramWithSourcesOnly(), "N-Ary Source Chaining Test Program"), 1L, 30L);
    }

    @Test
    public void testMixedInputsChainCreation() throws Exception {
        Assert.assertEquals(3L, sinkAndCompileJobGraph(createProgramWithMixedInputs()).getNumberOfVertices());
    }

    @Test
    public void testMixedInputsExecution() throws Exception {
        verifySequence(DataStreamUtils.collectBoundedStream(createProgramWithMixedInputs(), "N-Ary Source Chaining Test Program"), 1L, 30L);
    }

    @Test
    public void testMixedInputsWithUnionChainCreation() throws Exception {
        Assert.assertEquals(4L, sinkAndCompileJobGraph(createProgramWithUnionInput()).getNumberOfVertices());
    }

    @Test
    public void testMixedInputsWithUnionExecution() throws Exception {
        verifySequence(DataStreamUtils.collectBoundedStream(createProgramWithUnionInput(), "N-Ary Source Chaining Test Program"), 1L, 40L);
    }

    @Test
    public void testMixedInputsWithMultipleUnionsChainCreation() throws Exception {
        Assert.assertEquals(6L, sinkAndCompileJobGraph(createProgramWithMultipleUnionInputs()).getNumberOfVertices());
    }

    @Test
    public void testMixedInputsWithMultipleUnionsExecution() throws Exception {
        verifySequence(DataStreamUtils.collectBoundedStream(createProgramWithMultipleUnionInputs(), "N-Ary Source Chaining Test Program"), 1L, 60L);
    }

    private DataStream<Long> createProgramWithSourcesOnly() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.getConfig().enableObjectReuse();
        return nAryInputStreamOperation(executionEnvironment.fromSource(new NumberSequenceSource(1L, 10L), WatermarkStrategy.noWatermarks(), "source-1"), executionEnvironment.fromSource(new NumberSequenceSource(11L, 20L), WatermarkStrategy.noWatermarks(), "source-2"), executionEnvironment.fromSource(new NumberSequenceSource(21L, 30L), WatermarkStrategy.noWatermarks(), "source-3"));
    }

    private DataStream<Long> createProgramWithMixedInputs() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.getConfig().enableObjectReuse();
        DataStreamSource fromSource = executionEnvironment.fromSource(new NumberSequenceSource(1L, 10L), WatermarkStrategy.noWatermarks(), "source-1");
        return nAryInputStreamOperation(fromSource.map(l -> {
            return l;
        }), executionEnvironment.fromSource(new NumberSequenceSource(11L, 20L), WatermarkStrategy.noWatermarks(), "source-2"), executionEnvironment.fromSource(new NumberSequenceSource(21L, 30L), WatermarkStrategy.noWatermarks(), "source-3").map(l2 -> {
            return l2;
        }));
    }

    private DataStream<Long> createProgramWithUnionInput() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.getConfig().enableObjectReuse();
        return nAryInputStreamOperation(executionEnvironment.fromSource(new NumberSequenceSource(1L, 10L), WatermarkStrategy.noWatermarks(), "source-1").map(l -> {
            return l;
        }), executionEnvironment.fromSource(new NumberSequenceSource(11L, 20L), WatermarkStrategy.noWatermarks(), "source-2").union(new DataStream[]{executionEnvironment.fromSource(new NumberSequenceSource(21L, 30L), WatermarkStrategy.noWatermarks(), "source-3")}), executionEnvironment.fromSource(new NumberSequenceSource(31L, 40L), WatermarkStrategy.noWatermarks(), "source-4"));
    }

    private DataStream<Long> createProgramWithMultipleUnionInputs() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.getConfig().enableObjectReuse();
        return nAryInputStreamOperation(executionEnvironment.fromSource(new NumberSequenceSource(1L, 10L), WatermarkStrategy.noWatermarks(), "source-1").map(l -> {
            return l;
        }), executionEnvironment.fromSource(new NumberSequenceSource(11L, 20L), WatermarkStrategy.noWatermarks(), "source-2").union(new DataStream[]{executionEnvironment.fromSource(new NumberSequenceSource(21L, 30L), WatermarkStrategy.noWatermarks(), "source-3")}), executionEnvironment.fromSource(new NumberSequenceSource(31L, 40L), WatermarkStrategy.noWatermarks(), "source-4").map(l2 -> {
            return l2;
        }).union(new DataStream[]{executionEnvironment.fromSource(new NumberSequenceSource(41L, 50L), WatermarkStrategy.noWatermarks(), "source-5").map(l3 -> {
            return l3;
        })}), executionEnvironment.fromSource(new NumberSequenceSource(51L, 60L), WatermarkStrategy.noWatermarks(), "source-6"));
    }

    private static DataStream<Long> nAryInputStreamOperation(DataStream<?>... dataStreamArr) {
        StreamExecutionEnvironment executionEnvironment = dataStreamArr[0].getExecutionEnvironment();
        MultipleInputTransformation multipleInputTransformation = new MultipleInputTransformation("MultipleInputOperator", new NAryUnionOpFactory(dataStreamArr.length), Types.LONG, executionEnvironment.getParallelism());
        for (DataStream<?> dataStream : dataStreamArr) {
            multipleInputTransformation.addInput(dataStream.getTransformation());
        }
        multipleInputTransformation.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
        executionEnvironment.addOperator(multipleInputTransformation);
        return new MultipleConnectedStreams(executionEnvironment).transform(multipleInputTransformation);
    }

    private static JobGraph sinkAndCompileJobGraph(DataStream<?> dataStream) {
        dataStream.addSink(new DiscardingSink());
        return StreamingJobGraphGenerator.createJobGraph(dataStream.getExecutionEnvironment().getStreamGraph());
    }

    private static void verifySequence(List<Long> list, long j, long j2) {
        if (list.size() != (j2 - j) + 1) {
            Assert.fail(String.format("Expected: Sequence [%d, %d]. Found: %s", Long.valueOf(j), Long.valueOf(j2), list));
        }
        ArrayList arrayList = new ArrayList(list);
        arrayList.sort((v0, v1) -> {
            return v0.compareTo(v1);
        });
        int i = 0;
        long j3 = j;
        while (j3 <= j2) {
            if (j3 != ((Long) arrayList.get(i)).longValue()) {
                Assert.fail(String.format("Expected: Sequence [%d, %d]. Found: %s", Long.valueOf(j), Long.valueOf(j2), arrayList));
            }
            j3++;
            i++;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -594699391:
                if (implMethodName.equals("lambda$createProgramWithMixedInputs$c9b4a309$1")) {
                    z = 5;
                    break;
                }
                break;
            case -594064170:
                if (implMethodName.equals("lambda$createProgramWithMixedInputs$c9b4a2cb$1")) {
                    z = true;
                    break;
                }
                break;
            case 1209857468:
                if (implMethodName.equals("lambda$createProgramWithUnionInput$4027bc1f$1")) {
                    z = false;
                    break;
                }
                break;
            case 1756730033:
                if (implMethodName.equals("lambda$createProgramWithMultipleUnionInputs$4027bc1f$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1756730034:
                if (implMethodName.equals("lambda$createProgramWithMultipleUnionInputs$4027bc1f$2")) {
                    z = PARALLELISM;
                    break;
                }
                break;
            case 1756730035:
                if (implMethodName.equals("lambda$createProgramWithMultipleUnionInputs$4027bc1f$3")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SourceNAryInputChainingITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l -> {
                        return l;
                    };
                }
                break;
            case UnalignedCheckpointTestBase.BUFFER_PER_CHANNEL /* 1 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SourceNAryInputChainingITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l2 -> {
                        return l2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SourceNAryInputChainingITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l3 -> {
                        return l3;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SourceNAryInputChainingITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l32 -> {
                        return l32;
                    };
                }
                break;
            case PARALLELISM /* 4 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SourceNAryInputChainingITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l22 -> {
                        return l22;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SourceNAryInputChainingITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l23 -> {
                        return l23;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
