package org.apache.flink.optimizer.java;

import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.PartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/optimizer/java/PartitionOperatorTest.class */
public class PartitionOperatorTest extends CompilerTestBase {
    @Test
    public void testPartitionCustomOperatorPreservesFields() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.fromCollection(Collections.singleton(new Tuple2(0L, 0L))).partitionCustom(new Partitioner<Long>() { // from class: org.apache.flink.optimizer.java.PartitionOperatorTest.1
                public int partition(Long l, int i) {
                    return l.intValue();
                }
            }, 1).groupBy(new int[]{1}).reduceGroup(new IdentityGroupReducerCombinable()).output(new DiscardingOutputFormat());
            SingleInputPlanNode source = ((SinkPlanNode) compileNoStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource();
            SingleInputPlanNode source2 = source.getInput().getSource();
            Assert.assertEquals(ShipStrategyType.FORWARD, source.getInput().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.PARTITION_CUSTOM, source2.getInput().getShipStrategy());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRangePartitionOperatorPreservesFields() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.fromCollection(Collections.singleton(new Tuple2(0L, 0L))).partitionByRange(new int[]{1}).groupBy(new int[]{1}).reduceGroup(new IdentityGroupReducerCombinable()).output(new DiscardingOutputFormat());
            OptimizedPlan compileNoStats = compileNoStats(executionEnvironment.createProgramPlan());
            SingleInputPlanNode source = ((SinkPlanNode) compileNoStats.getDataSinks().iterator().next()).getInput().getSource();
            SingleInputPlanNode source2 = source.getInput().getSource();
            SingleInputPlanNode source3 = source2.getInput().getSource();
            Assert.assertEquals(ShipStrategyType.FORWARD, source.getInput().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.FORWARD, source2.getInput().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.PARTITION_CUSTOM, source3.getInput().getShipStrategy());
            List outgoingChannels = ((SourcePlanNode) compileNoStats.getDataSources().iterator().next()).getOutgoingChannels();
            Assert.assertEquals(2L, outgoingChannels.size());
            Assert.assertEquals(ShipStrategyType.FORWARD, ((Channel) outgoingChannels.get(0)).getShipStrategy());
            Assert.assertEquals(ShipStrategyType.FORWARD, ((Channel) outgoingChannels.get(1)).getShipStrategy());
            Assert.assertEquals(DataExchangeMode.PIPELINED, ((Channel) outgoingChannels.get(0)).getDataExchangeMode());
            Assert.assertEquals(DataExchangeMode.BATCH, ((Channel) outgoingChannels.get(1)).getDataExchangeMode());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRangePartitionOperatorPreservesFields2() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            DataSource fromCollection = executionEnvironment.fromCollection(Collections.singleton(new Tuple2(0L, 0L)));
            PartitionOperator partitionByRange = fromCollection.partitionByRange(new int[]{1});
            partitionByRange.groupBy(new int[]{1}).reduceGroup(new IdentityGroupReducerCombinable()).output(new DiscardingOutputFormat());
            fromCollection.groupBy(new int[]{0}).aggregate(Aggregations.SUM, 1).map(new MapFunction<Tuple2<Long, Long>, Long>() { // from class: org.apache.flink.optimizer.java.PartitionOperatorTest.2
                public Long map(Tuple2<Long, Long> tuple2) throws Exception {
                    return (Long) tuple2.f1;
                }
            }).output(new DiscardingOutputFormat());
            partitionByRange.filter(new FilterFunction<Tuple2<Long, Long>>() { // from class: org.apache.flink.optimizer.java.PartitionOperatorTest.3
                public boolean filter(Tuple2<Long, Long> tuple2) throws Exception {
                    return ((Long) tuple2.f0).longValue() % 2 == 0;
                }
            }).output(new DiscardingOutputFormat());
            OptimizedPlan compileNoStats = compileNoStats(executionEnvironment.createProgramPlan());
            SingleInputPlanNode source = ((SinkPlanNode) compileNoStats.getDataSinks().iterator().next()).getInput().getSource();
            SingleInputPlanNode source2 = source.getInput().getSource();
            SingleInputPlanNode source3 = source2.getInput().getSource();
            Assert.assertEquals(ShipStrategyType.FORWARD, source.getInput().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.FORWARD, source2.getInput().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.PARTITION_CUSTOM, source3.getInput().getShipStrategy());
            List outgoingChannels = ((SourcePlanNode) compileNoStats.getDataSources().iterator().next()).getOutgoingChannels();
            Assert.assertEquals(3L, outgoingChannels.size());
            Assert.assertEquals(ShipStrategyType.FORWARD, ((Channel) outgoingChannels.get(0)).getShipStrategy());
            Assert.assertEquals(ShipStrategyType.FORWARD, ((Channel) outgoingChannels.get(1)).getShipStrategy());
            Assert.assertEquals(ShipStrategyType.FORWARD, ((Channel) outgoingChannels.get(2)).getShipStrategy());
            Assert.assertEquals(DataExchangeMode.PIPELINED, ((Channel) outgoingChannels.get(0)).getDataExchangeMode());
            Assert.assertEquals(DataExchangeMode.PIPELINED, ((Channel) outgoingChannels.get(1)).getDataExchangeMode());
            Assert.assertEquals(DataExchangeMode.BATCH, ((Channel) outgoingChannels.get(2)).getDataExchangeMode());
            List outgoingChannels2 = source2.getOutgoingChannels();
            Assert.assertEquals(2L, outgoingChannels2.size());
            Assert.assertEquals(ShipStrategyType.FORWARD, ((Channel) outgoingChannels2.get(0)).getShipStrategy());
            Assert.assertEquals(ShipStrategyType.FORWARD, ((Channel) outgoingChannels2.get(1)).getShipStrategy());
            Assert.assertEquals(DataExchangeMode.PIPELINED, ((Channel) outgoingChannels2.get(0)).getDataExchangeMode());
            Assert.assertEquals(DataExchangeMode.PIPELINED, ((Channel) outgoingChannels2.get(1)).getDataExchangeMode());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
