package org.apache.flink.streaming.api;

import java.util.Collections;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/IterateTest.class */
public class IterateTest {
    private static final long MEMORYSIZE = 32;
    private static boolean[] iterated;
    private static int PARALLELISM = 2;

    /* loaded from: input_file:org/apache/flink/streaming/api/IterateTest$IterationHead.class */
    public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> {
        private static final long serialVersionUID = 1;

        public void flatMap(Boolean bool, Collector<Boolean> collector) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            if (bool.booleanValue()) {
                IterateTest.iterated[indexOfThisSubtask] = true;
            } else {
                collector.collect(bool);
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Boolean) obj, (Collector<Boolean>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/IterateTest$IterationTail.class */
    public static final class IterationTail extends RichFlatMapFunction<Boolean, Boolean> {
        private static final long serialVersionUID = 1;

        public void flatMap(Boolean bool, Collector<Boolean> collector) throws Exception {
            collector.collect(true);
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Boolean) obj, (Collector<Boolean>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/IterateTest$MySink.class */
    public static final class MySink implements SinkFunction<Boolean> {
        private static final long serialVersionUID = 1;

        public void invoke(Boolean bool) {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/IterateTest$NoOpMap.class */
    public static final class NoOpMap implements MapFunction<Boolean, Boolean> {
        private static final long serialVersionUID = 1;

        public Boolean map(Boolean bool) throws Exception {
            return bool;
        }
    }

    public StreamExecutionEnvironment constructIterativeJob(StreamExecutionEnvironment streamExecutionEnvironment) {
        streamExecutionEnvironment.setBufferTimeout(10L);
        IterativeDataStream iterate = streamExecutionEnvironment.fromCollection(Collections.nCopies(PARALLELISM, false)).iterate(3000L);
        iterate.closeWith(iterate.flatMap(new IterationHead()).flatMap(new IterationTail())).addSink(new MySink());
        return streamExecutionEnvironment;
    }

    @Test
    public void testColocation() throws Exception {
        TestStreamEnvironment testStreamEnvironment = new TestStreamEnvironment(4, MEMORYSIZE);
        IterativeDataStream iterate = testStreamEnvironment.fromElements(new Boolean[]{true}).rebalance().map(new NoOpMap()).iterate();
        iterate.closeWith(iterate.map(new NoOpMap()).setParallelism(2).name("HeadOperator").map(new NoOpMap()).setParallelism(3).name("TailOperator")).print();
        JobVertex jobVertex = null;
        JobVertex jobVertex2 = null;
        JobVertex jobVertex3 = null;
        JobVertex jobVertex4 = null;
        for (JobVertex jobVertex5 : testStreamEnvironment.getStreamGraph().getJobGraph().getVertices()) {
            if (jobVertex5.getName().contains("IterationSource")) {
                jobVertex = jobVertex5;
            } else if (jobVertex5.getName().contains("IterationSink")) {
                jobVertex2 = jobVertex5;
            } else if (jobVertex5.getName().contains("HeadOperator")) {
                jobVertex3 = jobVertex5;
            } else if (jobVertex5.getName().contains("TailOp")) {
                jobVertex4 = jobVertex5;
            }
        }
        Assert.assertTrue(jobVertex.getCoLocationGroup() != null);
        Assert.assertEquals(jobVertex.getCoLocationGroup(), jobVertex2.getCoLocationGroup());
        Assert.assertEquals(jobVertex3.getParallelism(), 2L);
        Assert.assertEquals(jobVertex4.getParallelism(), 3L);
        Assert.assertEquals(jobVertex.getParallelism(), jobVertex2.getParallelism());
    }

    @Test
    public void testPartitioning() throws Exception {
        TestStreamEnvironment testStreamEnvironment = new TestStreamEnvironment(4, MEMORYSIZE);
        IterativeDataStream iterate = testStreamEnvironment.fromElements(new Boolean[]{true}).iterate();
        IterativeDataStream iterate2 = testStreamEnvironment.fromElements(new Boolean[]{true}).iterate();
        DataStream broadcast = iterate.map(new NoOpMap()).name("Head1").broadcast();
        DataStream broadcast2 = iterate2.map(new NoOpMap()).name("Head2").broadcast();
        iterate.closeWith(broadcast.union(new DataStream[]{broadcast.map(new NoOpMap()).shuffle()}), true);
        iterate2.closeWith(broadcast2, false);
        System.out.println(testStreamEnvironment.getExecutionPlan());
        for (StreamGraph.StreamLoop streamLoop : testStreamEnvironment.getStreamGraph().getStreamLoops()) {
            StreamEdge streamEdge = (StreamEdge) streamLoop.getSink().getInEdges().get(0);
            if (streamEdge.getSourceVertex().getOperatorName().contains("Head1")) {
                Assert.assertTrue(streamEdge.getPartitioner() instanceof BroadcastPartitioner);
                Assert.assertTrue(((StreamEdge) streamLoop.getSink().getInEdges().get(1)).getPartitioner() instanceof ShufflePartitioner);
            } else {
                Assert.assertTrue(streamEdge.getPartitioner() instanceof RebalancePartitioner);
            }
        }
    }

    @Test
    public void test() throws Exception {
        TestStreamEnvironment testStreamEnvironment = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
        iterated = new boolean[PARALLELISM];
        constructIterativeJob(testStreamEnvironment).execute();
        for (boolean z : iterated) {
            Assert.assertTrue(z);
        }
    }

    @Test
    public void testWithCheckPointing() throws Exception {
        StreamExecutionEnvironment constructIterativeJob = constructIterativeJob(new TestStreamEnvironment(PARALLELISM, MEMORYSIZE));
        constructIterativeJob.enableCheckpointing();
        try {
            constructIterativeJob.execute();
            Assert.fail();
        } catch (UnsupportedOperationException e) {
        }
        try {
            constructIterativeJob.enableCheckpointing(1L, false);
            constructIterativeJob.execute();
            Assert.fail();
        } catch (UnsupportedOperationException e2) {
        }
        constructIterativeJob.enableCheckpointing(1L, true);
        constructIterativeJob.getStreamGraph().getJobGraph();
    }
}
