package org.apache.flink.streaming.runtime.operators;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.class */
public class StreamOperatorChainingTest {
    private static List<String> sink1Results;
    private static List<String> sink2Results;
    private static List<String> sink3Results;

    @Test
    public void testMultiChainingWithObjectReuse() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().enableObjectReuse();
        testMultiChaining(executionEnvironment);
    }

    @Test
    public void testMultiChainingWithoutObjectReuse() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableObjectReuse();
        testMultiChaining(executionEnvironment);
    }

    private void testMultiChaining(StreamExecutionEnvironment streamExecutionEnvironment) throws Exception {
        streamExecutionEnvironment.setParallelism(2);
        DataStreamSource fromElements = streamExecutionEnvironment.fromElements(new Integer[]{1, 2, 3});
        sink1Results = new ArrayList();
        sink2Results = new ArrayList();
        SingleOutputStreamOperator map = fromElements.map(num -> {
            return num;
        });
        map.map(num2 -> {
            return "First: " + num2;
        }).addSink(new SinkFunction<String>() { // from class: org.apache.flink.streaming.runtime.operators.StreamOperatorChainingTest.1
            public void invoke(String str, SinkFunction.Context context) throws Exception {
                StreamOperatorChainingTest.sink1Results.add(str);
            }
        });
        map.map(num3 -> {
            return "Second: " + num3;
        }).addSink(new SinkFunction<String>() { // from class: org.apache.flink.streaming.runtime.operators.StreamOperatorChainingTest.2
            public void invoke(String str, SinkFunction.Context context) throws Exception {
                StreamOperatorChainingTest.sink2Results.add(str);
            }
        });
        JobGraph jobGraph = streamExecutionEnvironment.getStreamGraph().getJobGraph();
        Assert.assertTrue(jobGraph.getVerticesSortedTopologicallyFromSources().size() == 2);
        JobVertex jobVertex = (JobVertex) jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        StreamConfig streamConfig = new StreamConfig(jobVertex.getConfiguration());
        StreamMap streamOperator = streamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());
        MockEnvironment createMockEnvironment = createMockEnvironment(jobVertex.getName());
        Throwable th = null;
        try {
            StreamTask createMockTask = createMockTask(streamConfig, createMockEnvironment);
            OperatorChain createOperatorChain = createOperatorChain(streamConfig, createMockEnvironment, createMockTask);
            streamOperator.setup(createMockTask, streamConfig, createOperatorChain.getChainEntryPoint());
            for (StreamOperator streamOperator2 : createOperatorChain.getAllOperators()) {
                if (streamOperator2 != null) {
                    streamOperator2.open();
                }
            }
            streamOperator.processElement(new StreamRecord(1));
            streamOperator.processElement(new StreamRecord(2));
            streamOperator.processElement(new StreamRecord(3));
            MatcherAssert.assertThat(sink1Results, Matchers.contains(new String[]{"First: 1", "First: 2", "First: 3"}));
            MatcherAssert.assertThat(sink2Results, Matchers.contains(new String[]{"Second: 1", "Second: 2", "Second: 3"}));
            if (createMockEnvironment != null) {
                if (0 == 0) {
                    createMockEnvironment.close();
                    return;
                }
                try {
                    createMockEnvironment.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createMockEnvironment != null) {
                if (0 != 0) {
                    try {
                        createMockEnvironment.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createMockEnvironment.close();
                }
            }
            throw th3;
        }
    }

    private MockEnvironment createMockEnvironment(String str) {
        return new MockEnvironmentBuilder().setTaskName(str).setMemorySize(3145728L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE).build();
    }

    @Test
    public void testMultiChainingWithSplitWithObjectReuse() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().enableObjectReuse();
        testMultiChainingWithSplit(executionEnvironment);
    }

    @Test
    public void testMultiChainingWithSplitWithoutObjectReuse() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableObjectReuse();
        testMultiChainingWithSplit(executionEnvironment);
    }

    private void testMultiChainingWithSplit(StreamExecutionEnvironment streamExecutionEnvironment) throws Exception {
        streamExecutionEnvironment.setParallelism(2);
        DataStreamSource fromElements = streamExecutionEnvironment.fromElements(new Integer[]{1, 2, 3});
        sink1Results = new ArrayList();
        sink2Results = new ArrayList();
        sink3Results = new ArrayList();
        SplitStream split = fromElements.map(num -> {
            return num;
        }).split(new OutputSelector<Integer>() { // from class: org.apache.flink.streaming.runtime.operators.StreamOperatorChainingTest.3
            private static final long serialVersionUID = 1;

            public Iterable<String> select(Integer num2) {
                return num2.equals(1) ? Collections.singletonList("one") : Collections.singletonList("other");
            }
        });
        split.select(new String[]{"one"}).map(num2 -> {
            return "First 1: " + num2;
        }).addSink(new SinkFunction<String>() { // from class: org.apache.flink.streaming.runtime.operators.StreamOperatorChainingTest.4
            public void invoke(String str, SinkFunction.Context context) throws Exception {
                StreamOperatorChainingTest.sink1Results.add(str);
            }
        });
        split.select(new String[]{"one"}).map(num3 -> {
            return "First 2: " + num3;
        }).addSink(new SinkFunction<String>() { // from class: org.apache.flink.streaming.runtime.operators.StreamOperatorChainingTest.5
            public void invoke(String str, SinkFunction.Context context) throws Exception {
                StreamOperatorChainingTest.sink2Results.add(str);
            }
        });
        split.select(new String[]{"other"}).map(num4 -> {
            return "Second: " + num4;
        }).addSink(new SinkFunction<String>() { // from class: org.apache.flink.streaming.runtime.operators.StreamOperatorChainingTest.6
            public void invoke(String str, SinkFunction.Context context) throws Exception {
                StreamOperatorChainingTest.sink3Results.add(str);
            }
        });
        JobGraph jobGraph = streamExecutionEnvironment.getStreamGraph().getJobGraph();
        Assert.assertTrue(jobGraph.getVerticesSortedTopologicallyFromSources().size() == 2);
        JobVertex jobVertex = (JobVertex) jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        StreamConfig streamConfig = new StreamConfig(jobVertex.getConfiguration());
        StreamMap streamOperator = streamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());
        MockEnvironment createMockEnvironment = createMockEnvironment(jobVertex.getName());
        Throwable th = null;
        try {
            try {
                StreamTask createMockTask = createMockTask(streamConfig, createMockEnvironment);
                OperatorChain createOperatorChain = createOperatorChain(streamConfig, createMockEnvironment, createMockTask);
                streamOperator.setup(createMockTask, streamConfig, createOperatorChain.getChainEntryPoint());
                for (StreamOperator streamOperator2 : createOperatorChain.getAllOperators()) {
                    if (streamOperator2 != null) {
                        streamOperator2.open();
                    }
                }
                streamOperator.processElement(new StreamRecord(1));
                streamOperator.processElement(new StreamRecord(2));
                streamOperator.processElement(new StreamRecord(3));
                MatcherAssert.assertThat(sink1Results, Matchers.contains(new String[]{"First 1: 1"}));
                MatcherAssert.assertThat(sink2Results, Matchers.contains(new String[]{"First 2: 1"}));
                MatcherAssert.assertThat(sink3Results, Matchers.contains(new String[]{"Second: 2", "Second: 3"}));
                if (createMockEnvironment != null) {
                    if (0 == 0) {
                        createMockEnvironment.close();
                        return;
                    }
                    try {
                        createMockEnvironment.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createMockEnvironment != null) {
                if (th != null) {
                    try {
                        createMockEnvironment.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createMockEnvironment.close();
                }
            }
            throw th4;
        }
    }

    private <IN, OT extends StreamOperator<IN>> OperatorChain<IN, OT> createOperatorChain(StreamConfig streamConfig, Environment environment, StreamTask<IN, OT> streamTask) {
        return new OperatorChain<>(streamTask, StreamTask.createRecordWriters(streamConfig, environment));
    }

    private <IN, OT extends StreamOperator<IN>> StreamTask<IN, OT> createMockTask(StreamConfig streamConfig, Environment environment) throws Exception {
        return new MockStreamTaskBuilder(environment).setConfig(streamConfig).setExecutionConfig(new ExecutionConfig().enableObjectReuse()).build();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1910555286:
                if (implMethodName.equals("lambda$testMultiChaining$e1faee96$1")) {
                    z = 6;
                    break;
                }
                break;
            case -1910555285:
                if (implMethodName.equals("lambda$testMultiChaining$e1faee96$2")) {
                    z = 5;
                    break;
                }
                break;
            case -1910555284:
                if (implMethodName.equals("lambda$testMultiChaining$e1faee96$3")) {
                    z = 3;
                    break;
                }
                break;
            case 1688115724:
                if (implMethodName.equals("lambda$testMultiChainingWithSplit$e1faee96$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1688115725:
                if (implMethodName.equals("lambda$testMultiChainingWithSplit$e1faee96$2")) {
                    z = 4;
                    break;
                }
                break;
            case 1688115726:
                if (implMethodName.equals("lambda$testMultiChainingWithSplit$e1faee96$3")) {
                    z = false;
                    break;
                }
                break;
            case 1688115727:
                if (implMethodName.equals("lambda$testMultiChainingWithSplit$e1faee96$4")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num3 -> {
                        return "First 2: " + num3;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num4 -> {
                        return "Second: " + num4;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num -> {
                        return num;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num32 -> {
                        return "Second: " + num32;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num2 -> {
                        return "First 1: " + num2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num22 -> {
                        return "First: " + num22;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num5 -> {
                        return num5;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
