/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class StreamOperatorChainingTest {
    private static List<String> sink1Results;
    private static List<String> sink2Results;
    private static List<String> sink3Results;

    @Test
    public void testMultiChainingWithObjectReuse() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().enableObjectReuse();
        this.testMultiChaining(env);
    }

    @Test
    public void testMultiChainingWithoutObjectReuse() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableObjectReuse();
        this.testMultiChaining(env);
    }

    private void testMultiChaining(StreamExecutionEnvironment env) throws Exception {
        DataStreamSource input = env.fromElements((Object[])new Integer[]{1, 2, 3});
        sink1Results = new ArrayList<String>();
        sink2Results = new ArrayList<String>();
        input = input.map((MapFunction)new MapFunction<Integer, Integer>(){
            private static final long serialVersionUID = 1L;

            public Integer map(Integer value) throws Exception {
                return value;
            }
        });
        input.map((MapFunction)new MapFunction<Integer, String>(){
            private static final long serialVersionUID = 1L;

            public String map(Integer value) throws Exception {
                return "First: " + value;
            }
        }).addSink((SinkFunction)new SinkFunction<String>(){
            private static final long serialVersionUID = 1L;

            public void invoke(String value) throws Exception {
                sink1Results.add(value);
            }
        });
        input.map((MapFunction)new MapFunction<Integer, String>(){
            private static final long serialVersionUID = 1L;

            public String map(Integer value) throws Exception {
                return "Second: " + value;
            }
        }).addSink((SinkFunction)new SinkFunction<String>(){
            private static final long serialVersionUID = 1L;

            public void invoke(String value) throws Exception {
                sink2Results.add(value);
            }
        });
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        Assert.assertTrue((jobGraph.getVerticesSortedTopologicallyFromSources().size() == 2 ? 1 : 0) != 0);
        JobVertex chainedVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        Configuration configuration = chainedVertex.getConfiguration();
        StreamConfig streamConfig = new StreamConfig(configuration);
        StreamMap headOperator = (StreamMap)streamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());
        StreamTask mockTask = this.createMockTask(streamConfig, chainedVertex.getName());
        OperatorChain operatorChain = new OperatorChain(mockTask, (StreamOperator)headOperator, (AccumulatorRegistry.Reporter)Mockito.mock(AccumulatorRegistry.Reporter.class));
        headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint());
        for (StreamOperator operator : operatorChain.getAllOperators()) {
            if (operator == null) continue;
            operator.open();
        }
        headOperator.processElement(new StreamRecord((Object)1));
        headOperator.processElement(new StreamRecord((Object)2));
        headOperator.processElement(new StreamRecord((Object)3));
        MatcherAssert.assertThat(sink1Results, (Matcher)Matchers.contains((Object[])new String[]{"First: 1", "First: 2", "First: 3"}));
        MatcherAssert.assertThat(sink2Results, (Matcher)Matchers.contains((Object[])new String[]{"Second: 1", "Second: 2", "Second: 3"}));
    }

    @Test
    public void testMultiChainingWithSplitWithObjectReuse() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().enableObjectReuse();
        this.testMultiChainingWithSplit(env);
    }

    @Test
    public void testMultiChainingWithSplitWithoutObjectReuse() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableObjectReuse();
        this.testMultiChainingWithSplit(env);
    }

    private void testMultiChainingWithSplit(StreamExecutionEnvironment env) throws Exception {
        DataStreamSource input = env.fromElements((Object[])new Integer[]{1, 2, 3});
        sink1Results = new ArrayList<String>();
        sink2Results = new ArrayList<String>();
        sink3Results = new ArrayList<String>();
        input = input.map((MapFunction)new MapFunction<Integer, Integer>(){
            private static final long serialVersionUID = 1L;

            public Integer map(Integer value) throws Exception {
                return value;
            }
        });
        SplitStream split = input.split((OutputSelector)new OutputSelector<Integer>(){
            private static final long serialVersionUID = 1L;

            public Iterable<String> select(Integer value) {
                if (value.equals(1)) {
                    return Collections.singletonList("one");
                }
                return Collections.singletonList("other");
            }
        });
        split.select(new String[]{"one"}).map((MapFunction)new MapFunction<Integer, String>(){
            private static final long serialVersionUID = 1L;

            public String map(Integer value) throws Exception {
                return "First 1: " + value;
            }
        }).addSink((SinkFunction)new SinkFunction<String>(){
            private static final long serialVersionUID = 1L;

            public void invoke(String value) throws Exception {
                sink1Results.add(value);
            }
        });
        split.select(new String[]{"one"}).map((MapFunction)new MapFunction<Integer, String>(){
            private static final long serialVersionUID = 1L;

            public String map(Integer value) throws Exception {
                return "First 2: " + value;
            }
        }).addSink((SinkFunction)new SinkFunction<String>(){
            private static final long serialVersionUID = 1L;

            public void invoke(String value) throws Exception {
                sink2Results.add(value);
            }
        });
        split.select(new String[]{"other"}).map((MapFunction)new MapFunction<Integer, String>(){
            private static final long serialVersionUID = 1L;

            public String map(Integer value) throws Exception {
                return "Second: " + value;
            }
        }).addSink((SinkFunction)new SinkFunction<String>(){
            private static final long serialVersionUID = 1L;

            public void invoke(String value) throws Exception {
                sink3Results.add(value);
            }
        });
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        Assert.assertTrue((jobGraph.getVerticesSortedTopologicallyFromSources().size() == 2 ? 1 : 0) != 0);
        JobVertex chainedVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        Configuration configuration = chainedVertex.getConfiguration();
        StreamConfig streamConfig = new StreamConfig(configuration);
        StreamMap headOperator = (StreamMap)streamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());
        StreamTask mockTask = this.createMockTask(streamConfig, chainedVertex.getName());
        OperatorChain operatorChain = new OperatorChain(mockTask, (StreamOperator)headOperator, (AccumulatorRegistry.Reporter)Mockito.mock(AccumulatorRegistry.Reporter.class));
        headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint());
        for (StreamOperator operator : operatorChain.getAllOperators()) {
            if (operator == null) continue;
            operator.open();
        }
        headOperator.processElement(new StreamRecord((Object)1));
        headOperator.processElement(new StreamRecord((Object)2));
        headOperator.processElement(new StreamRecord((Object)3));
        MatcherAssert.assertThat(sink1Results, (Matcher)Matchers.contains((Object[])new String[]{"First 1: 1"}));
        MatcherAssert.assertThat(sink2Results, (Matcher)Matchers.contains((Object[])new String[]{"First 2: 1"}));
        MatcherAssert.assertThat(sink3Results, (Matcher)Matchers.contains((Object[])new String[]{"Second: 2", "Second: 3"}));
    }

    private <IN, OT extends StreamOperator<IN>> StreamTask<IN, OT> createMockTask(StreamConfig streamConfig, String taskName) {
        Object checkpointLock = new Object();
        MockEnvironment env = new MockEnvironment(taskName, 0x300000L, new MockInputSplitProvider(), 1024);
        StreamTask mockTask = (StreamTask)Mockito.mock(StreamTask.class);
        Mockito.when((Object)mockTask.getName()).thenReturn((Object)"Mock Task");
        Mockito.when((Object)mockTask.getCheckpointLock()).thenReturn(checkpointLock);
        Mockito.when((Object)mockTask.getConfiguration()).thenReturn((Object)streamConfig);
        Mockito.when((Object)mockTask.getEnvironment()).thenReturn((Object)env);
        Mockito.when((Object)mockTask.getExecutionConfig()).thenReturn((Object)new ExecutionConfig().enableObjectReuse());
        try {
            ((StreamTask)Mockito.doAnswer((Answer)new Answer<AbstractStateBackend>((Environment)env){
                final /* synthetic */ Environment val$env;
                {
                    this.val$env = environment;
                }

                public AbstractStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
                    String operatorIdentifier = (String)invocationOnMock.getArguments()[0];
                    TypeSerializer keySerializer = (TypeSerializer)invocationOnMock.getArguments()[1];
                    MemoryStateBackend backend = MemoryStateBackend.create();
                    backend.initializeForJob(this.val$env, operatorIdentifier, keySerializer);
                    return backend;
                }
            }).when((Object)mockTask)).createStateBackend((String)org.mockito.Matchers.any(String.class), (TypeSerializer)org.mockito.Matchers.any(TypeSerializer.class));
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
        return mockTask;
    }
}

