package org.apache.flink.test.streaming.runtime;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.test.streaming.runtime.util.EvenOddOutputSelector;
import org.apache.flink.test.streaming.runtime.util.NoOpIntMap;
import org.apache.flink.test.streaming.runtime.util.ReceiveCheckNoOpSink;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MathUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/test/streaming/runtime/IterateITCase.class */
public class IterateITCase extends AbstractTestBase {
    private static boolean[] iterated;
    private int parallelism = miniClusterResource.getNumberSlots();
    private static final Logger LOG = LoggerFactory.getLogger(IterateITCase.class);
    public static CoMapFunction<Integer, String, String> noOpCoMap = new CoMapFunction<Integer, String, String>() { // from class: org.apache.flink.test.streaming.runtime.IterateITCase.5
        public String map1(Integer num) throws Exception {
            return num.toString();
        }

        public String map2(String str) throws Exception {
            return str;
        }
    };
    public static MapFunction<Integer, Integer> noOpIntMap = new NoOpIntMap();
    public static MapFunction<String, String> noOpStrMap = new MapFunction<String, String>() { // from class: org.apache.flink.test.streaming.runtime.IterateITCase.6
        public String map(String str) throws Exception {
            return str;
        }
    };
    public static CoMapFunction<Integer, Integer, Integer> noOpIntCoMap = new CoMapFunction<Integer, Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.IterateITCase.7
        public Integer map1(Integer num) throws Exception {
            return num;
        }

        public Integer map2(Integer num) throws Exception {
            return num;
        }
    };
    public static MapFunction<Boolean, Boolean> noOpBoolMap = new MapFunction<Boolean, Boolean>() { // from class: org.apache.flink.test.streaming.runtime.IterateITCase.8
        public Boolean map(Boolean bool) throws Exception {
            return bool;
        }
    };

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/IterateITCase$IterationHead.class */
    private static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> {
        private IterationHead() {
        }

        public void flatMap(Boolean bool, Collector<Boolean> collector) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            if (bool.booleanValue()) {
                IterateITCase.iterated[indexOfThisSubtask] = true;
            } else {
                collector.collect(true);
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Boolean) obj, (Collector<Boolean>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/IterateITCase$TestSink.class */
    private static class TestSink implements SinkFunction<String> {
        private static final long serialVersionUID = 1;
        public static List<String> collected = new ArrayList();

        private TestSink() {
        }

        public void invoke(String str) throws Exception {
            collected.add(str);
        }
    }

    @Test(expected = UnsupportedOperationException.class)
    public void testIncorrectParallelism() throws Exception {
        IterativeStream iterate = StreamExecutionEnvironment.getExecutionEnvironment().fromElements(new Integer[]{1, 10}).iterate();
        iterate.closeWith(iterate.map(noOpIntMap)).print();
    }

    @Test
    public void testDoubleClosing() throws Exception {
        IterativeStream iterate = StreamExecutionEnvironment.getExecutionEnvironment().fromElements(new Integer[]{1, 10}).map(noOpIntMap).iterate();
        iterate.closeWith(iterate.map(noOpIntMap));
        iterate.closeWith(iterate.map(noOpIntMap));
    }

    @Test(expected = UnsupportedOperationException.class)
    public void testDifferingParallelism() throws Exception {
        IterativeStream iterate = StreamExecutionEnvironment.getExecutionEnvironment().fromElements(new Integer[]{1, 10}).map(noOpIntMap).iterate();
        iterate.closeWith(iterate.map(noOpIntMap).setParallelism(this.parallelism / 2));
    }

    @Test(expected = UnsupportedOperationException.class)
    public void testCoDifferingParallelism() throws Exception {
        IterativeStream.ConnectedIterativeStreams withFeedbackType = StreamExecutionEnvironment.getExecutionEnvironment().fromElements(new Integer[]{1, 10}).map(noOpIntMap).iterate().withFeedbackType(Integer.class);
        withFeedbackType.closeWith(withFeedbackType.map(noOpIntCoMap).setParallelism(this.parallelism / 2));
    }

    @Test(expected = UnsupportedOperationException.class)
    public void testClosingFromOutOfLoop() throws Exception {
        SingleOutputStreamOperator map = StreamExecutionEnvironment.getExecutionEnvironment().fromElements(new Integer[]{1, 10}).map(noOpIntMap);
        map.iterate().closeWith(map.iterate().map(noOpIntMap));
    }

    @Test(expected = UnsupportedOperationException.class)
    public void testCoIterClosingFromOutOfLoop() throws Exception {
        SingleOutputStreamOperator map = StreamExecutionEnvironment.getExecutionEnvironment().fromElements(new Integer[]{1, 10}).map(noOpIntMap);
        map.iterate().withFeedbackType(Integer.class).closeWith(map.iterate().map(noOpIntMap));
    }

    @Test(expected = IllegalStateException.class)
    public void testExecutionWithEmptyIteration() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Integer[]{1, 10}).map(noOpIntMap).iterate().map(noOpIntMap).print();
        executionEnvironment.execute();
    }

    @Test
    public void testImmutabilityWithCoiteration() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        IterativeStream iterate = executionEnvironment.fromElements(new Integer[]{1, 10}).map(noOpIntMap).iterate();
        IterativeStream.ConnectedIterativeStreams withFeedbackType = iterate.withFeedbackType(String.class);
        iterate.closeWith(iterate.map(noOpIntMap)).print();
        withFeedbackType.closeWith(withFeedbackType.map(noOpCoMap)).print();
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        Assert.assertEquals(2L, streamGraph.getIterationSourceSinkPairs().size());
        for (Tuple2 tuple2 : streamGraph.getIterationSourceSinkPairs()) {
            Assert.assertEquals(((StreamEdge) ((StreamNode) tuple2.f0).getOutEdges().get(0)).getTargetVertex(), ((StreamEdge) ((StreamNode) tuple2.f1).getInEdges().get(0)).getSourceVertex());
        }
    }

    @Test
    public void testmultipleHeadsTailsSimple() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        IterativeStream iterate = executionEnvironment.fromElements(new Integer[]{1, 2, 3, 4, 5}).shuffle().map(noOpIntMap).name("ParallelizeMapShuffle").union(new DataStream[]{executionEnvironment.fromElements(new Integer[]{1, 2, 3, 4, 5}).map(noOpIntMap).name("ParallelizeMapRebalance")}).iterate();
        SingleOutputStreamOperator parallelism = iterate.map(noOpIntMap).name("IterRebalanceMap").setParallelism(this.parallelism / 2);
        SingleOutputStreamOperator name = iterate.map(noOpIntMap).name("IterForwardMap");
        iterate.map(noOpIntMap).setParallelism(this.parallelism / 2).addSink(new ReceiveCheckNoOpSink());
        iterate.map(noOpIntMap).addSink(new ReceiveCheckNoOpSink());
        iterate.closeWith(executionEnvironment.fromElements(new Integer[]{1, 2, 3, 4, 5}).map(noOpIntMap).name("EvenOddSourceMap").split(new EvenOddOutputSelector()).select(new String[]{"even"}).union(new DataStream[]{parallelism.rebalance().map(noOpIntMap).broadcast(), name.shuffle()}));
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        JobGraph jobGraph = streamGraph.getJobGraph();
        Assert.assertEquals(1L, streamGraph.getIterationSourceSinkPairs().size());
        Tuple2 tuple2 = (Tuple2) streamGraph.getIterationSourceSinkPairs().iterator().next();
        StreamNode streamNode = (StreamNode) tuple2.f0;
        StreamNode streamNode2 = (StreamNode) tuple2.f1;
        Assert.assertEquals(4L, streamNode.getOutEdges().size());
        Assert.assertEquals(3L, streamNode2.getInEdges().size());
        Assert.assertEquals(streamNode.getParallelism(), streamNode2.getParallelism());
        for (StreamEdge streamEdge : streamNode.getOutEdges()) {
            if (streamEdge.getTargetVertex().getOperatorName().equals("IterRebalanceMap")) {
                Assert.assertTrue(streamEdge.getPartitioner() instanceof RebalancePartitioner);
            } else if (streamEdge.getTargetVertex().getOperatorName().equals("IterForwardMap")) {
                Assert.assertTrue(streamEdge.getPartitioner() instanceof ForwardPartitioner);
            }
        }
        for (StreamEdge streamEdge2 : streamNode2.getInEdges()) {
            if (streamGraph.getStreamNode(Integer.valueOf(streamEdge2.getSourceId())).getOperatorName().equals("ParallelizeMapShuffle")) {
                Assert.assertTrue(streamEdge2.getPartitioner() instanceof ShufflePartitioner);
            }
            if (streamGraph.getStreamNode(Integer.valueOf(streamEdge2.getSourceId())).getOperatorName().equals("ParallelizeMapForward")) {
                Assert.assertTrue(streamEdge2.getPartitioner() instanceof ForwardPartitioner);
            }
            if (streamGraph.getStreamNode(Integer.valueOf(streamEdge2.getSourceId())).getOperatorName().equals("EvenOddSourceMap")) {
                Assert.assertTrue(streamEdge2.getPartitioner() instanceof ForwardPartitioner);
                Assert.assertTrue(streamEdge2.getSelectedNames().contains("even"));
            }
        }
        JobVertex jobVertex = null;
        JobVertex jobVertex2 = null;
        for (JobVertex jobVertex3 : jobGraph.getVertices()) {
            if (jobVertex3.getName().contains("IterationSource")) {
                jobVertex = jobVertex3;
            } else if (jobVertex3.getName().contains("IterationSink")) {
                jobVertex2 = jobVertex3;
            }
        }
        Assert.assertTrue(jobVertex.getCoLocationGroup() != null);
        Assert.assertEquals(jobVertex.getCoLocationGroup(), jobVertex2.getCoLocationGroup());
    }

    @Test
    public void testmultipleHeadsTailsWithTailPartitioning() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        IterativeStream iterate = executionEnvironment.fromElements(new Integer[]{1, 2, 3, 4, 5}).shuffle().map(noOpIntMap).union(new DataStream[]{executionEnvironment.fromElements(new Integer[]{1, 2, 3, 4, 5}).map(noOpIntMap)}).iterate();
        SingleOutputStreamOperator name = iterate.map(noOpIntMap).name("map1");
        DataStream rebalance = iterate.map(noOpIntMap).setParallelism(this.parallelism / 2).name("shuffle").rebalance();
        iterate.map(noOpIntMap).setParallelism(this.parallelism / 2).addSink(new ReceiveCheckNoOpSink());
        iterate.map(noOpIntMap).addSink(new ReceiveCheckNoOpSink());
        iterate.closeWith(executionEnvironment.fromElements(new Integer[]{1, 2, 3, 4, 5}).map(noOpIntMap).name("split").split(new EvenOddOutputSelector()).select(new String[]{"even"}).union(new DataStream[]{name.map(noOpIntMap).name("bc").broadcast(), rebalance.map(noOpIntMap).shuffle()}));
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        JobGraph jobGraph = streamGraph.getJobGraph();
        Assert.assertEquals(1L, streamGraph.getIterationSourceSinkPairs().size());
        Tuple2 tuple2 = (Tuple2) streamGraph.getIterationSourceSinkPairs().iterator().next();
        StreamNode streamNode = (StreamNode) tuple2.f0;
        StreamNode streamNode2 = (StreamNode) tuple2.f1;
        Assert.assertEquals(4L, streamNode.getOutEdges().size());
        Assert.assertEquals(3L, streamNode2.getInEdges().size());
        Assert.assertEquals(streamNode.getParallelism(), streamNode2.getParallelism());
        for (StreamEdge streamEdge : streamNode.getOutEdges()) {
            if (streamEdge.getTargetVertex().getOperatorName().equals("map1")) {
                Assert.assertTrue(streamEdge.getPartitioner() instanceof ForwardPartitioner);
                Assert.assertEquals(4L, streamEdge.getTargetVertex().getParallelism());
            } else if (streamEdge.getTargetVertex().getOperatorName().equals("shuffle")) {
                Assert.assertTrue(streamEdge.getPartitioner() instanceof RebalancePartitioner);
                Assert.assertEquals(2L, streamEdge.getTargetVertex().getParallelism());
            }
        }
        for (StreamEdge streamEdge2 : streamNode2.getInEdges()) {
            String operatorName = streamEdge2.getSourceVertex().getOperatorName();
            if (operatorName.equals("split")) {
                Assert.assertTrue(streamEdge2.getPartitioner() instanceof ForwardPartitioner);
                Assert.assertTrue(streamEdge2.getSelectedNames().contains("even"));
            } else if (operatorName.equals("bc")) {
                Assert.assertTrue(streamEdge2.getPartitioner() instanceof BroadcastPartitioner);
            } else if (operatorName.equals("shuffle")) {
                Assert.assertTrue(streamEdge2.getPartitioner() instanceof ShufflePartitioner);
            }
        }
        JobVertex jobVertex = null;
        JobVertex jobVertex2 = null;
        for (JobVertex jobVertex3 : jobGraph.getVertices()) {
            if (jobVertex3.getName().contains("IterationSource")) {
                jobVertex = jobVertex3;
            } else if (jobVertex3.getName().contains("IterationSink")) {
                jobVertex2 = jobVertex3;
            }
        }
        Assert.assertTrue(jobVertex.getCoLocationGroup() != null);
        Assert.assertTrue(jobVertex2.getCoLocationGroup() != null);
        Assert.assertEquals(jobVertex.getCoLocationGroup(), jobVertex2.getCoLocationGroup());
    }

    @Test
    public void testSimpleIteration() throws Exception {
        int i;
        int i2;
        int i3 = 1;
        int i4 = 0;
        while (i4 < 5) {
            try {
                StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
                iterated = new boolean[this.parallelism];
                IterativeStream iterate = executionEnvironment.fromCollection(Collections.nCopies(this.parallelism * 2, false)).map(noOpBoolMap).name("ParallelizeMap").iterate(3000 * i3);
                SingleOutputStreamOperator map = iterate.flatMap(new IterationHead()).map(noOpBoolMap);
                iterate.map(noOpBoolMap).addSink(new ReceiveCheckNoOpSink());
                iterate.closeWith(map).addSink(new ReceiveCheckNoOpSink());
                executionEnvironment.execute();
                for (boolean z : iterated) {
                    Assert.assertTrue(z);
                }
                return;
            } finally {
                if (i >= i2) {
                }
            }
        }
    }

    @Test
    public void testCoIteration() throws Exception {
        int i;
        int i2;
        int i3 = 1;
        int i4 = 0;
        while (i4 < 5) {
            try {
                TestSink.collected = new ArrayList();
                StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
                executionEnvironment.setParallelism(2);
                DataStream name = executionEnvironment.fromElements(new String[]{"1000", "2000"}).map(noOpStrMap).name("ParallelizeMap");
                IterativeStream.ConnectedIterativeStreams withFeedbackType = executionEnvironment.fromElements(new Integer[]{0, 0}).map(noOpIntMap).name("ParallelizeMap").iterate(2000 * i3).withFeedbackType("String");
                try {
                    withFeedbackType.keyBy(1, 2);
                    Assert.fail();
                } catch (InvalidProgramException e) {
                }
                SingleOutputStreamOperator flatMap = withFeedbackType.flatMap(new RichCoFlatMapFunction<Integer, String, String>() { // from class: org.apache.flink.test.streaming.runtime.IterateITCase.1
                    private static final long serialVersionUID = 1;
                    boolean seenFromSource = false;

                    public void flatMap1(Integer num, Collector<String> collector) throws Exception {
                        collector.collect(Integer.valueOf(num.intValue() + 1).toString());
                    }

                    public void flatMap2(String str, Collector<String> collector) throws Exception {
                        Integer valueOf = Integer.valueOf(str);
                        if (valueOf.intValue() < 2) {
                            collector.collect(Integer.valueOf(valueOf.intValue() + 1).toString());
                        }
                        if (valueOf.intValue() == 1000 || valueOf.intValue() == 2000) {
                            this.seenFromSource = true;
                        }
                    }

                    public void close() {
                        Assert.assertTrue(this.seenFromSource);
                    }

                    public /* bridge */ /* synthetic */ void flatMap2(Object obj, Collector collector) throws Exception {
                        flatMap2((String) obj, (Collector<String>) collector);
                    }

                    public /* bridge */ /* synthetic */ void flatMap1(Object obj, Collector collector) throws Exception {
                        flatMap1((Integer) obj, (Collector<String>) collector);
                    }
                });
                withFeedbackType.map(new CoMapFunction<Integer, String, String>() { // from class: org.apache.flink.test.streaming.runtime.IterateITCase.2
                    public String map1(Integer num) throws Exception {
                        return num.toString();
                    }

                    public String map2(String str) throws Exception {
                        return str;
                    }
                }).addSink(new ReceiveCheckNoOpSink());
                withFeedbackType.closeWith(flatMap.broadcast().union(new DataStream[]{name}));
                flatMap.addSink(new TestSink()).setParallelism(1);
                Assert.assertEquals(1L, executionEnvironment.getStreamGraph().getIterationSourceSinkPairs().size());
                executionEnvironment.execute();
                Collections.sort(TestSink.collected);
                Assert.assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected);
                return;
            } finally {
                if (i >= i2) {
                }
            }
        }
    }

    @Test
    public void testGroupByFeedback() throws Exception {
        int i;
        int i2;
        int i3 = 1;
        int i4 = 0;
        while (i4 < 5) {
            try {
                StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
                executionEnvironment.setParallelism(this.parallelism - 1);
                executionEnvironment.getConfig().setMaxParallelism(executionEnvironment.getParallelism());
                KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.IterateITCase.3
                    public Integer getKey(Integer num) throws Exception {
                        return Integer.valueOf(num.intValue() % 3);
                    }
                };
                IterativeStream iterate = executionEnvironment.fromElements(new Integer[]{1, 2, 3}).map(noOpIntMap).name("ParallelizeMap").keyBy(keySelector).iterate(3000 * i3);
                SingleOutputStreamOperator flatMap = iterate.flatMap(new RichFlatMapFunction<Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.IterateITCase.4
                    int received = 0;
                    int key = -1;

                    public void flatMap(Integer num, Collector<Integer> collector) throws Exception {
                        this.received++;
                        if (this.key == -1) {
                            this.key = MathUtils.murmurHash(num.intValue() % 3) % 3;
                        } else {
                            Assert.assertEquals(this.key, MathUtils.murmurHash(num.intValue() % 3) % 3);
                        }
                        if (num.intValue() > 0) {
                            collector.collect(Integer.valueOf(num.intValue() - 1));
                        }
                    }

                    public void close() {
                        Assert.assertTrue(this.received > 1);
                    }

                    public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                        flatMap((Integer) obj, (Collector<Integer>) collector);
                    }
                });
                iterate.closeWith(flatMap.keyBy(keySelector).union(new DataStream[]{flatMap.map(noOpIntMap).keyBy(keySelector)})).addSink(new ReceiveCheckNoOpSink());
                executionEnvironment.execute();
                return;
            } finally {
                if (i >= i2) {
                }
            }
        }
    }

    @Test
    public void testWithCheckPointing() throws Exception {
        int i;
        int i2;
        int i3 = 1;
        int i4 = 0;
        while (i4 < 5) {
            try {
                StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
                executionEnvironment.enableCheckpointing();
                IterativeStream iterate = executionEnvironment.fromCollection(Collections.nCopies(this.parallelism * 2, false)).map(noOpBoolMap).name("ParallelizeMap").iterate(3000 * i3);
                iterate.closeWith(iterate.flatMap(new IterationHead())).addSink(new ReceiveCheckNoOpSink());
                try {
                    executionEnvironment.execute();
                    Assert.fail();
                } catch (UnsupportedOperationException e) {
                }
                try {
                    executionEnvironment.enableCheckpointing(1L, CheckpointingMode.EXACTLY_ONCE, false);
                    executionEnvironment.execute();
                    Assert.fail();
                } catch (UnsupportedOperationException e2) {
                }
                executionEnvironment.enableCheckpointing(1L, CheckpointingMode.EXACTLY_ONCE, true);
                executionEnvironment.getStreamGraph().getJobGraph();
                return;
            } finally {
                if (i >= i2) {
                }
            }
        }
    }
}
