package org.apache.flink.streaming.graph;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.class */
public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest$NoOpFilterFunction.class */
    private static class NoOpFilterFunction implements FilterFunction<String> {
        private static final long serialVersionUID = 500005424900187476L;

        private NoOpFilterFunction() {
        }

        public boolean filter(String str) throws Exception {
            return true;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest$NoOpKeySelector.class */
    private static class NoOpKeySelector implements KeySelector<String, String> {
        private static final long serialVersionUID = -96127515593422991L;

        private NoOpKeySelector() {
        }

        public String getKey(String str) throws Exception {
            return str;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest$NoOpMapFunction.class */
    private static class NoOpMapFunction implements MapFunction<String, String> {
        private static final long serialVersionUID = 6584823409744624276L;

        private NoOpMapFunction() {
        }

        public String map(String str) throws Exception {
            return str;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest$NoOpReduceFunction.class */
    private static class NoOpReduceFunction implements ReduceFunction<String> {
        private static final long serialVersionUID = -8775747640749256372L;

        private NoOpReduceFunction() {
        }

        public String reduce(String str, String str2) throws Exception {
            return str;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest$NoOpSinkFunction.class */
    private static class NoOpSinkFunction implements SinkFunction<String> {
        private static final long serialVersionUID = -5654199886203297279L;

        private NoOpSinkFunction() {
        }

        public void invoke(String str) throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest$NoOpSourceFunction.class */
    private static class NoOpSourceFunction implements ParallelSourceFunction<String> {
        private static final long serialVersionUID = -5459224792698512636L;

        private NoOpSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
        }

        public void cancel() {
        }
    }

    @Test
    public void testNodeHashIsDeterministic() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.addSource(new NoOpSourceFunction(), "src0").map(new NoOpMapFunction()).filter(new NoOpFilterFunction()).keyBy(new NoOpKeySelector()).reduce(new NoOpReduceFunction()).name("reduce").map(new NoOpMapFunction()).union(new DataStream[]{createLocalEnvironment.addSource(new NoOpSourceFunction(), "src1").filter(new NoOpFilterFunction()), createLocalEnvironment.addSource(new NoOpSourceFunction(), "src2").filter(new NoOpFilterFunction())}).addSink(new NoOpSinkFunction()).name("sink");
        Map<JobVertexID, String> rememberIds = rememberIds(createLocalEnvironment.getStreamGraph().getJobGraph());
        LocalStreamEnvironment createLocalEnvironment2 = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment2.setParallelism(4);
        createLocalEnvironment2.addSource(new NoOpSourceFunction(), "src0").map(new NoOpMapFunction()).filter(new NoOpFilterFunction()).keyBy(new NoOpKeySelector()).reduce(new NoOpReduceFunction()).name("reduce").map(new NoOpMapFunction()).union(new DataStream[]{createLocalEnvironment2.addSource(new NoOpSourceFunction(), "src1").filter(new NoOpFilterFunction()), createLocalEnvironment2.addSource(new NoOpSourceFunction(), "src2").filter(new NoOpFilterFunction())}).addSink(new NoOpSinkFunction()).name("sink");
        verifyIdsEqual(createLocalEnvironment2.getStreamGraph().getJobGraph(), rememberIds);
    }

    @Test
    public void testNodeHashIdenticalSources() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.disableOperatorChaining();
        createLocalEnvironment.addSource(new NoOpSourceFunction()).union(new DataStream[]{createLocalEnvironment.addSource(new NoOpSourceFunction())}).addSink(new NoOpSinkFunction());
        List verticesSortedTopologicallyFromSources = createLocalEnvironment.getStreamGraph().getJobGraph().getVerticesSortedTopologicallyFromSources();
        Assert.assertTrue(((JobVertex) verticesSortedTopologicallyFromSources.get(0)).isInputVertex());
        Assert.assertTrue(((JobVertex) verticesSortedTopologicallyFromSources.get(1)).isInputVertex());
        Assert.assertNotNull(((JobVertex) verticesSortedTopologicallyFromSources.get(0)).getID());
        Assert.assertNotNull(((JobVertex) verticesSortedTopologicallyFromSources.get(1)).getID());
        Assert.assertNotEquals(((JobVertex) verticesSortedTopologicallyFromSources.get(0)).getID(), ((JobVertex) verticesSortedTopologicallyFromSources.get(1)).getID());
    }

    @Test
    public void testNodeHashAfterSourceUnchaining() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.addSource(new NoOpSourceFunction()).map(new NoOpMapFunction()).filter(new NoOpFilterFunction()).addSink(new NoOpSinkFunction());
        JobVertexID id = ((JobVertex) createLocalEnvironment.getStreamGraph().getJobGraph().getVerticesSortedTopologicallyFromSources().get(0)).getID();
        LocalStreamEnvironment createLocalEnvironment2 = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment2.setParallelism(4);
        createLocalEnvironment2.addSource(new NoOpSourceFunction()).map(new NoOpMapFunction()).startNewChain().filter(new NoOpFilterFunction()).addSink(new NoOpSinkFunction());
        Assert.assertNotEquals(id, ((JobVertex) createLocalEnvironment2.getStreamGraph().getJobGraph().getVerticesSortedTopologicallyFromSources().get(0)).getID());
    }

    @Test
    public void testNodeHashAfterIntermediateUnchaining() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.addSource(new NoOpSourceFunction()).map(new NoOpMapFunction()).name("map").startNewChain().filter(new NoOpFilterFunction()).addSink(new NoOpSinkFunction());
        JobVertex jobVertex = (JobVertex) createLocalEnvironment.getStreamGraph().getJobGraph().getVerticesSortedTopologicallyFromSources().get(1);
        Assert.assertTrue(jobVertex.getName().startsWith("map"));
        JobVertexID id = jobVertex.getID();
        LocalStreamEnvironment createLocalEnvironment2 = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment2.setParallelism(4);
        createLocalEnvironment2.addSource(new NoOpSourceFunction()).map(new NoOpMapFunction()).name("map").startNewChain().filter(new NoOpFilterFunction()).startNewChain().addSink(new NoOpSinkFunction());
        JobVertex jobVertex2 = (JobVertex) createLocalEnvironment2.getStreamGraph().getJobGraph().getVerticesSortedTopologicallyFromSources().get(1);
        Assert.assertEquals("map", jobVertex2.getName());
        Assert.assertNotEquals(id, jobVertex2.getID());
    }

    @Test
    public void testNodeHashIdenticalNodes() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.disableOperatorChaining();
        DataStreamSource addSource = createLocalEnvironment.addSource(new NoOpSourceFunction());
        addSource.map(new NoOpMapFunction()).addSink(new NoOpSinkFunction());
        addSource.map(new NoOpMapFunction()).addSink(new NoOpSinkFunction());
        JobGraph jobGraph = createLocalEnvironment.getStreamGraph().getJobGraph();
        HashSet hashSet = new HashSet();
        Iterator it = jobGraph.getVertices().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(hashSet.add(((JobVertex) it.next()).getID()));
        }
    }

    @Test
    public void testChangedOperatorName() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.addSource(new NoOpSourceFunction(), "A").map(new NoOpMapFunction());
        JobVertexID id = createLocalEnvironment.getStreamGraph().getJobGraph().getVerticesAsArray()[0].getID();
        LocalStreamEnvironment createLocalEnvironment2 = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment2.addSource(new NoOpSourceFunction(), "B").map(new NoOpMapFunction());
        Assert.assertEquals(id, createLocalEnvironment2.getStreamGraph().getJobGraph().getVerticesAsArray()[0].getID());
    }

    @Test
    public void testManualHashAssignment() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.disableOperatorChaining();
        SingleOutputStreamOperator uid = createLocalEnvironment.addSource(new NoOpSourceFunction()).name("source").uid("source");
        uid.map(new NoOpMapFunction()).addSink(new NoOpSinkFunction()).name("sink0").uid("sink0");
        uid.map(new NoOpMapFunction()).addSink(new NoOpSinkFunction()).name("sink1").uid("sink1");
        JobGraph jobGraph = createLocalEnvironment.getStreamGraph().getJobGraph();
        HashSet hashSet = new HashSet();
        Iterator it = jobGraph.getVertices().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(hashSet.add(((JobVertex) it.next()).getID()));
        }
        LocalStreamEnvironment createLocalEnvironment2 = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment2.setParallelism(4);
        createLocalEnvironment2.disableOperatorChaining();
        SingleOutputStreamOperator uid2 = createLocalEnvironment2.addSource(new NoOpSourceFunction()).map(new NoOpMapFunction()).name("source").uid("source");
        uid2.map(new NoOpMapFunction()).keyBy(new NoOpKeySelector()).reduce(new NoOpReduceFunction()).addSink(new NoOpSinkFunction()).name("sink0").uid("sink0");
        uid2.map(new NoOpMapFunction()).keyBy(new NoOpKeySelector()).reduce(new NoOpReduceFunction()).addSink(new NoOpSinkFunction()).name("sink1").uid("sink1");
        JobGraph jobGraph2 = createLocalEnvironment2.getStreamGraph().getJobGraph();
        Assert.assertNotEquals(jobGraph.getJobID(), jobGraph2.getJobID());
        for (JobVertex jobVertex : jobGraph2.getVertices()) {
            if (jobVertex.getName().endsWith("source") || jobVertex.getName().endsWith("sink0") || jobVertex.getName().endsWith("sink1")) {
                Assert.assertTrue(hashSet.contains(jobVertex.getID()));
            }
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void testManualHashAssignmentCollisionThrowsException() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.disableOperatorChaining();
        createLocalEnvironment.addSource(new NoOpSourceFunction()).uid("source").map(new NoOpMapFunction()).uid("source").addSink(new NoOpSinkFunction());
        createLocalEnvironment.getStreamGraph().getJobGraph();
    }

    @Test
    public void testManualHashAssignmentForIntermediateNodeInChain() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.addSource(new NoOpSourceFunction()).map(new NoOpMapFunction()).uid("map").addSink(new NoOpSinkFunction());
        createLocalEnvironment.getStreamGraph().getJobGraph();
    }

    @Test
    public void testManualHashAssignmentForStartNodeInInChain() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.addSource(new NoOpSourceFunction()).uid("source").map(new NoOpMapFunction()).addSink(new NoOpSinkFunction());
        createLocalEnvironment.getStreamGraph().getJobGraph();
    }

    @Test
    public void testUserProvidedHashing() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        List asList = Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
        createLocalEnvironment.addSource(new NoOpSourceFunction(), "src").setUidHash((String) asList.get(0)).map(new NoOpMapFunction()).filter(new NoOpFilterFunction()).keyBy(new NoOpKeySelector()).reduce(new NoOpReduceFunction()).name("reduce").setUidHash((String) asList.get(1));
        int i = 1;
        Iterator it = createLocalEnvironment.getStreamGraph().getJobGraph().getVertices().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(((JobVertexID) ((JobVertex) it.next()).getIdAlternatives().get(1)).toString(), asList.get(i));
            i--;
        }
    }

    @Test
    public void testUserProvidedHashingOnChainSupported() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.addSource(new NoOpSourceFunction(), "src").setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").map(new NoOpMapFunction()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").filter(new NoOpFilterFunction()).setUidHash("cccccccccccccccccccccccccccccccc").keyBy(new NoOpKeySelector()).reduce(new NoOpReduceFunction()).name("reduce").setUidHash("dddddddddddddddddddddddddddddddd");
        createLocalEnvironment.getStreamGraph().getJobGraph();
    }

    private Map<JobVertexID, String> rememberIds(JobGraph jobGraph) {
        HashMap hashMap = new HashMap();
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            hashMap.put(jobVertex.getID(), jobVertex.getName());
        }
        return hashMap;
    }

    private void verifyIdsEqual(JobGraph jobGraph, Map<JobVertexID, String> map) {
        Assert.assertEquals(jobGraph.getNumberOfVertices(), map.size());
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            String str = map.get(jobVertex.getID());
            Assert.assertNotNull(str);
            Assert.assertEquals(str, jobVertex.getName());
        }
    }

    private void verifyIdsNotEqual(JobGraph jobGraph, Map<JobVertexID, String> map) {
        Assert.assertEquals(jobGraph.getNumberOfVertices(), map.size());
        Iterator it = jobGraph.getVertices().iterator();
        while (it.hasNext()) {
            Assert.assertFalse(map.containsKey(((JobVertex) it.next()).getID()));
        }
    }
}
