package org.apache.flink.test.operators;

import java.io.Serializable;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/operators/PartitionITCase.class */
public class PartitionITCase extends MultipleProgramsTestBase {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/operators/PartitionITCase$ComparablePojo.class */
    public static class ComparablePojo implements Comparable<ComparablePojo> {
        private Long first;
        private Long second;

        public Long getFirst() {
            return this.first;
        }

        public void setFirst(Long l) {
            this.first = l;
        }

        public Long getSecond() {
            return this.second;
        }

        public void setSecond(Long l) {
            this.second = l;
        }

        public ComparablePojo(Long l, Long l2) {
            this.first = l;
            this.second = l2;
        }

        public ComparablePojo() {
        }

        @Override // java.lang.Comparable
        public int compareTo(ComparablePojo comparablePojo) {
            int compare = Long.compare(this.first.longValue(), comparablePojo.first.longValue());
            return compare == 0 ? (-1) * Long.compare(this.second.longValue(), comparablePojo.second.longValue()) : compare;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/PartitionITCase$ComparablePojoComparator.class */
    private static class ComparablePojoComparator implements Comparator<Tuple2<ComparablePojo, Long>>, Serializable {
        private ComparablePojoComparator() {
        }

        @Override // java.util.Comparator
        public int compare(Tuple2<ComparablePojo, Long> tuple2, Tuple2<ComparablePojo, Long> tuple22) {
            return ((ComparablePojo) tuple2.f0).compareTo((ComparablePojo) tuple22.f0);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/PartitionITCase$ExtractComparablePojo.class */
    private static class ExtractComparablePojo implements MapPartitionFunction<Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>, Tuple2<ComparablePojo, ComparablePojo>> {
        private ExtractComparablePojo() {
        }

        public void mapPartition(Iterable<Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>> iterable, Collector<Tuple2<ComparablePojo, ComparablePojo>> collector) throws Exception {
            for (Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>> tuple2 : iterable) {
                collector.collect(new Tuple2(((Tuple2) tuple2.f0).f0, ((Tuple2) tuple2.f1).f0));
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/PartitionITCase$Filter1.class */
    private static class Filter1 implements FilterFunction<Long> {
        private static final long serialVersionUID = 1;

        private Filter1() {
        }

        public boolean filter(Long l) throws Exception {
            return l.longValue() > 780;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/PartitionITCase$KeySelector1.class */
    private static class KeySelector1 implements KeySelector<Tuple3<Integer, Long, String>, Long> {
        private static final long serialVersionUID = 1;

        private KeySelector1() {
        }

        public Long getKey(Tuple3<Integer, Long, String> tuple3) throws Exception {
            return (Long) tuple3.f1;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/PartitionITCase$LongComparator.class */
    private static class LongComparator implements Comparator<Long>, Serializable {
        private final boolean ascending;

        public LongComparator(boolean z) {
            this.ascending = z;
        }

        @Override // java.util.Comparator
        public int compare(Long l, Long l2) {
            return this.ascending ? Long.compare(l.longValue(), l2.longValue()) : (-1) * Long.compare(l.longValue(), l2.longValue());
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/PartitionITCase$Mapper1.class */
    private static class Mapper1 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1;

        private Mapper1() {
        }

        public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> tuple2) throws Exception {
            tuple2.f1 = Integer.valueOf(((Integer) tuple2.f1).intValue() / 10);
            return tuple2;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/PartitionITCase$MinMaxSelector.class */
    private static class MinMaxSelector<T> implements MapPartitionFunction<T, Tuple2<T, T>> {
        private final Comparator<T> comparator;

        public MinMaxSelector(Comparator<T> comparator) {
            this.comparator = comparator;
        }

        public void mapPartition(Iterable<T> iterable, Collector<Tuple2<T, T>> collector) throws Exception {
            Iterator<T> it = iterable.iterator();
            T next = it.next();
            T t = next;
            while (it.hasNext()) {
                T next2 = it.next();
                if (this.comparator.compare(next2, next) < 0) {
                    next = next2;
                }
                if (this.comparator.compare(next2, t) > 0) {
                    t = next2;
                }
            }
            collector.collect(new Tuple2(next, t));
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/PartitionITCase$ObjectSelfKeySelector.class */
    private static class ObjectSelfKeySelector implements KeySelector<Long, Long> {
        private ObjectSelfKeySelector() {
        }

        public Long getKey(Long l) throws Exception {
            return l;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/PartitionITCase$PartitionIndexMapper.class */
    private static class PartitionIndexMapper extends RichMapFunction<Long, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1;

        private PartitionIndexMapper() {
        }

        public Tuple2<Integer, Integer> map(Long l) throws Exception {
            return new Tuple2<>(Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), 1);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/PartitionITCase$PrefixMapper.class */
    private static class PrefixMapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private PrefixMapper() {
        }

        public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> tuple3) throws Exception {
            if (((String) tuple3.f2).length() > 5) {
                tuple3.f2 = ((String) tuple3.f2).substring(0, 5);
            }
            return tuple3;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/PartitionITCase$Reducer1.class */
    private static class Reducer1 implements ReduceFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1;

        private Reducer1() {
        }

        public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> tuple2, Tuple2<Integer, Integer> tuple22) {
            return new Tuple2<>(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/PartitionITCase$Tuple2Comparator.class */
    private static class Tuple2Comparator<T> implements Comparator<Tuple2<T, T>>, Serializable {
        private final Comparator<T> firstComparator;
        private final Comparator<T> secondComparator;

        public Tuple2Comparator(Comparator<T> comparator) {
            this(comparator, comparator);
        }

        public Tuple2Comparator(Comparator<T> comparator, Comparator<T> comparator2) {
            this.firstComparator = comparator;
            this.secondComparator = comparator2;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.util.Comparator
        public int compare(Tuple2<T, T> tuple2, Tuple2<T, T> tuple22) {
            long compare = this.firstComparator.compare(tuple2.f0, tuple22.f0);
            if (compare > 0) {
                return 1;
            }
            if (compare < 0) {
                return -1;
            }
            long compare2 = this.secondComparator.compare(tuple2.f1, tuple22.f1);
            if (compare2 > 0) {
                return 1;
            }
            return compare2 < 0 ? -1 : 0;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/PartitionITCase$UniqueLongMapper.class */
    private static class UniqueLongMapper implements MapPartitionFunction<Long, Long> {
        private static final long serialVersionUID = 1;

        private UniqueLongMapper() {
        }

        public void mapPartition(Iterable<Long> iterable, Collector<Long> collector) throws Exception {
            HashSet hashSet = new HashSet();
            Iterator<Long> it = iterable.iterator();
            while (it.hasNext()) {
                hashSet.add(it.next());
            }
            Iterator it2 = hashSet.iterator();
            while (it2.hasNext()) {
                collector.collect((Long) it2.next());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/PartitionITCase$UniqueNestedPojoLongMapper.class */
    private static class UniqueNestedPojoLongMapper implements MapPartitionFunction<CollectionDataSets.POJO, Long> {
        private static final long serialVersionUID = 1;

        private UniqueNestedPojoLongMapper() {
        }

        public void mapPartition(Iterable<CollectionDataSets.POJO> iterable, Collector<Long> collector) throws Exception {
            HashSet hashSet = new HashSet();
            Iterator<CollectionDataSets.POJO> it = iterable.iterator();
            while (it.hasNext()) {
                hashSet.add(Long.valueOf(it.next().nestedPojo.longNumber));
            }
            Iterator it2 = hashSet.iterator();
            while (it2.hasNext()) {
                collector.collect((Long) it2.next());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/PartitionITCase$UniqueTupleLongMapper.class */
    private static class UniqueTupleLongMapper implements MapPartitionFunction<Tuple3<Integer, Long, String>, Long> {
        private static final long serialVersionUID = 1;

        private UniqueTupleLongMapper() {
        }

        public void mapPartition(Iterable<Tuple3<Integer, Long, String>> iterable, Collector<Long> collector) throws Exception {
            HashSet hashSet = new HashSet();
            Iterator<Tuple3<Integer, Long, String>> it = iterable.iterator();
            while (it.hasNext()) {
                hashSet.add(it.next().f1);
            }
            Iterator it2 = hashSet.iterator();
            while (it2.hasNext()) {
                collector.collect((Long) it2.next());
            }
        }
    }

    public PartitionITCase(MultipleProgramsTestBase.TestExecutionMode testExecutionMode) {
        super(testExecutionMode);
    }

    @Test
    public void testHashPartitionByKeyField() throws Exception {
        TestBaseUtils.compareResultAsText(CollectionDataSets.get3TupleDataSet(ExecutionEnvironment.getExecutionEnvironment()).partitionByHash(new int[]{1}).mapPartition(new UniqueTupleLongMapper()).collect(), "1\n2\n3\n4\n5\n6\n");
    }

    @Test
    public void testRangePartitionByKeyField() throws Exception {
        TestBaseUtils.compareResultAsText(CollectionDataSets.get3TupleDataSet(ExecutionEnvironment.getExecutionEnvironment()).partitionByRange(new int[]{1}).mapPartition(new UniqueTupleLongMapper()).collect(), "1\n2\n3\n4\n5\n6\n");
    }

    @Test
    public void testHashPartitionByKeyField2() throws Exception {
        TestBaseUtils.compareResultAsText(CollectionDataSets.get3TupleDataSet(ExecutionEnvironment.getExecutionEnvironment()).map(new PrefixMapper()).partitionByHash(new int[]{1, 2}).groupBy(new int[]{1, 2}).sum(0).collect(), "(1,1,Hi)\n(5,2,Hello)\n(4,3,Hello)\n(5,3,I am )\n(6,3,Luke )\n(34,4,Comme)\n(65,5,Comme)\n(111,6,Comme)");
    }

    @Test
    public void testRangePartitionByKeyField2() throws Exception {
        TestBaseUtils.compareResultAsText(CollectionDataSets.get3TupleDataSet(ExecutionEnvironment.getExecutionEnvironment()).map(new PrefixMapper()).partitionByRange(new int[]{1, 2}).groupBy(new int[]{1, 2}).sum(0).collect(), "(1,1,Hi)\n(5,2,Hello)\n(4,3,Hello)\n(5,3,I am )\n(6,3,Luke )\n(34,4,Comme)\n(65,5,Comme)\n(111,6,Comme)");
    }

    @Test
    public void testHashPartitionOfAtomicType() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsText(executionEnvironment.generateSequence(1L, 6L).union(executionEnvironment.generateSequence(1L, 6L)).rebalance().partitionByHash(new String[]{"*"}).mapPartition(new UniqueLongMapper()).collect(), "1\n2\n3\n4\n5\n6\n");
    }

    @Test
    public void testRangePartitionOfAtomicType() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsText(executionEnvironment.generateSequence(1L, 6L).union(executionEnvironment.generateSequence(1L, 6L)).rebalance().partitionByRange(new String[]{"*"}).mapPartition(new UniqueLongMapper()).collect(), "1\n2\n3\n4\n5\n6\n");
    }

    @Test
    public void testHashPartitionByKeySelector() throws Exception {
        TestBaseUtils.compareResultAsText(CollectionDataSets.get3TupleDataSet(ExecutionEnvironment.getExecutionEnvironment()).partitionByHash(new KeySelector1()).mapPartition(new UniqueTupleLongMapper()).collect(), "1\n2\n3\n4\n5\n6\n");
    }

    @Test
    public void testRangePartitionByKeySelector() throws Exception {
        TestBaseUtils.compareResultAsText(CollectionDataSets.get3TupleDataSet(ExecutionEnvironment.getExecutionEnvironment()).partitionByRange(new KeySelector1()).mapPartition(new UniqueTupleLongMapper()).collect(), "1\n2\n3\n4\n5\n6\n");
    }

    @Test
    public void testForcedRebalancing() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        List collect = executionEnvironment.generateSequence(1L, 3000L).filter(new Filter1()).rebalance().map(new PartitionIndexMapper()).groupBy(new int[]{0}).reduce(new Reducer1()).map(new Mapper1()).collect();
        StringBuilder sb = new StringBuilder();
        int parallelism = (2220 / executionEnvironment.getParallelism()) / 10;
        for (int i = 0; i < executionEnvironment.getParallelism(); i++) {
            sb.append('(').append(i).append(',').append(parallelism).append(")\n");
        }
        TestBaseUtils.compareResultAsText(collect, sb.toString());
    }

    @Test
    public void testHashPartitionByKeyFieldAndDifferentParallelism() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        TestBaseUtils.compareResultAsText(CollectionDataSets.get3TupleDataSet(executionEnvironment).partitionByHash(new int[]{1}).setParallelism(4).mapPartition(new UniqueTupleLongMapper()).collect(), "1\n2\n3\n4\n5\n6\n");
    }

    @Test
    public void testRangePartitionByKeyFieldAndDifferentParallelism() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        TestBaseUtils.compareResultAsText(CollectionDataSets.get3TupleDataSet(executionEnvironment).partitionByRange(new int[]{1}).setParallelism(4).mapPartition(new UniqueTupleLongMapper()).collect(), "1\n2\n3\n4\n5\n6\n");
    }

    @Test
    public void testHashPartitionWithKeyExpression() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        TestBaseUtils.compareResultAsText(CollectionDataSets.getDuplicatePojoDataSet(executionEnvironment).partitionByHash(new String[]{"nestedPojo.longNumber"}).setParallelism(4).mapPartition(new UniqueNestedPojoLongMapper()).collect(), "10000\n20000\n30000\n");
    }

    @Test
    public void testRangePartitionWithKeyExpression() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        TestBaseUtils.compareResultAsText(CollectionDataSets.getDuplicatePojoDataSet(executionEnvironment).partitionByRange(new String[]{"nestedPojo.longNumber"}).setParallelism(4).mapPartition(new UniqueNestedPojoLongMapper()).collect(), "10000\n20000\n30000\n");
    }

    @Test
    public void testRangePartitionerOnSequenceData() throws Exception {
        DataSource generateSequence = ExecutionEnvironment.getExecutionEnvironment().generateSequence(0L, 10000L);
        ObjectSelfKeySelector objectSelfKeySelector = new ObjectSelfKeySelector();
        MinMaxSelector minMaxSelector = new MinMaxSelector(new LongComparator(true));
        Tuple2Comparator tuple2Comparator = new Tuple2Comparator(new LongComparator(true));
        List<Tuple2> collect = generateSequence.partitionByRange(objectSelfKeySelector).mapPartition(minMaxSelector).collect();
        Collections.sort(collect, tuple2Comparator);
        long j = -1;
        for (Tuple2 tuple2 : collect) {
            if (j == -1) {
                j = ((Long) tuple2.f1).longValue();
            } else {
                long longValue = ((Long) tuple2.f0).longValue();
                Assert.assertTrue(((Long) tuple2.f0).longValue() < ((Long) tuple2.f1).longValue());
                Assert.assertEquals(j + 1, longValue);
                j = ((Long) tuple2.f1).longValue();
            }
        }
    }

    @Test(expected = InvalidProgramException.class)
    public void testRangePartitionInIteration() throws Exception {
        if (((MultipleProgramsTestBase) this).mode == MultipleProgramsTestBase.TestExecutionMode.COLLECTION) {
            throw new InvalidProgramException("Does not apply for collection execution");
        }
        MapOperator map = ExecutionEnvironment.getExecutionEnvironment().generateSequence(0L, 10000L).map(new MapFunction<Long, Tuple2<Long, String>>() { // from class: org.apache.flink.test.operators.PartitionITCase.1
            public Tuple2<Long, String> map(Long l) throws Exception {
                return new Tuple2<>(l, Long.toString(l.longValue()));
            }
        });
        DeltaIteration iterateDelta = map.iterateDelta(map, 10, new int[]{0});
        JoinOperator.ProjectJoin projectSecond = iterateDelta.getWorkset().partitionByRange(new int[]{1}).join(iterateDelta.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).projectFirst(new int[]{0}).projectSecond(new int[]{1});
        iterateDelta.closeWith(projectSecond, projectSecond).collect();
    }

    @Test
    public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
        MapOperator map = ExecutionEnvironment.getExecutionEnvironment().generateSequence(0L, 10000L).map(new MapFunction<Long, Tuple2<Long, Long>>() { // from class: org.apache.flink.test.operators.PartitionITCase.2
            public Tuple2<Long, Long> map(Long l) throws Exception {
                return new Tuple2<>(Long.valueOf(l.longValue() / 5000), Long.valueOf(l.longValue() % 5000));
            }
        });
        Tuple2Comparator tuple2Comparator = new Tuple2Comparator(new LongComparator(true), new LongComparator(false));
        List<Tuple2> collect = map.partitionByRange(new int[]{0, 1}).withOrders(new Order[]{Order.ASCENDING, Order.DESCENDING}).mapPartition(new MinMaxSelector(tuple2Comparator)).collect();
        Collections.sort(collect, new Tuple2Comparator(tuple2Comparator));
        Tuple2 tuple2 = null;
        for (Tuple2 tuple22 : collect) {
            Assert.assertTrue("Min element in each partition should be smaller than max.", tuple2Comparator.compare((Tuple2) tuple22.f0, (Tuple2) tuple22.f1) <= 0);
            if (tuple2 == null) {
                tuple2 = (Tuple2) tuple22.f1;
            } else {
                Assert.assertTrue("Partitions overlap. Previous max should be smaller than current min.", tuple2Comparator.compare(tuple2, (Tuple2) tuple22.f0) < 0);
                if (((Long) tuple2.f0).equals(((Tuple2) tuple22.f0).f0)) {
                    Assert.assertEquals("Ordering on the second field should be continous.", ((Long) tuple2.f1).longValue() - 1, ((Long) ((Tuple2) tuple22.f0).f1).longValue());
                }
                tuple2 = (Tuple2) tuple22.f1;
            }
        }
    }

    @Test
    public void testRangePartitionerOnSequenceNestedDataWithOrders() throws Exception {
        MapOperator map = ExecutionEnvironment.getExecutionEnvironment().generateSequence(0L, 10000L).map(new MapFunction<Long, Tuple2<Tuple2<Long, Long>, Long>>() { // from class: org.apache.flink.test.operators.PartitionITCase.3
            public Tuple2<Tuple2<Long, Long>, Long> map(Long l) throws Exception {
                return new Tuple2<>(new Tuple2(Long.valueOf(l.longValue() / 5000), Long.valueOf(l.longValue() % 5000)), l);
            }
        });
        Tuple2Comparator tuple2Comparator = new Tuple2Comparator(new LongComparator(true), new LongComparator(true));
        List<Tuple2> collect = map.partitionByRange(new int[]{0}).withOrders(new Order[]{Order.ASCENDING}).mapPartition(new MapPartitionFunction<Tuple2<Tuple2<Long, Long>, Long>, Tuple2<Long, Long>>() { // from class: org.apache.flink.test.operators.PartitionITCase.4
            public void mapPartition(Iterable<Tuple2<Tuple2<Long, Long>, Long>> iterable, Collector<Tuple2<Long, Long>> collector) throws Exception {
                Iterator<Tuple2<Tuple2<Long, Long>, Long>> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next().f0);
                }
            }
        }).mapPartition(new MinMaxSelector(tuple2Comparator)).collect();
        Collections.sort(collect, new Tuple2Comparator(tuple2Comparator));
        Tuple2 tuple2 = null;
        for (Tuple2 tuple22 : collect) {
            Assert.assertTrue("Min element in each partition should be smaller than max.", tuple2Comparator.compare((Tuple2) tuple22.f0, (Tuple2) tuple22.f1) <= 0);
            if (tuple2 == null) {
                tuple2 = (Tuple2) tuple22.f1;
            } else {
                Assert.assertTrue("Partitions overlap. Previous max should be smaller than current min.", tuple2Comparator.compare(tuple2, (Tuple2) tuple22.f0) < 0);
                if (((Long) tuple2.f0).equals(((Tuple2) tuple22.f0).f0)) {
                    Assert.assertEquals("Ordering on the second field should be continous.", ((Long) tuple2.f1).longValue() + 1, ((Long) ((Tuple2) tuple22.f0).f1).longValue());
                }
                tuple2 = (Tuple2) tuple22.f1;
            }
        }
    }

    @Test
    public void testRangePartitionerWithKeySelectorOnSequenceNestedDataWithOrders() throws Exception {
        List<Tuple2> collect = ExecutionEnvironment.getExecutionEnvironment().generateSequence(0L, 10000L).map(new MapFunction<Long, Tuple2<ComparablePojo, Long>>() { // from class: org.apache.flink.test.operators.PartitionITCase.5
            public Tuple2<ComparablePojo, Long> map(Long l) throws Exception {
                return new Tuple2<>(new ComparablePojo(Long.valueOf(l.longValue() / 5000), Long.valueOf(l.longValue() % 5000)), l);
            }
        }).partitionByRange(new KeySelector<Tuple2<ComparablePojo, Long>, ComparablePojo>() { // from class: org.apache.flink.test.operators.PartitionITCase.6
            public ComparablePojo getKey(Tuple2<ComparablePojo, Long> tuple2) throws Exception {
                return (ComparablePojo) tuple2.f0;
            }
        }).withOrders(new Order[]{Order.ASCENDING}).mapPartition(new MinMaxSelector(new ComparablePojoComparator())).mapPartition(new ExtractComparablePojo()).collect();
        Collections.sort(collect, new Comparator<Tuple2<ComparablePojo, ComparablePojo>>() { // from class: org.apache.flink.test.operators.PartitionITCase.7
            @Override // java.util.Comparator
            public int compare(Tuple2<ComparablePojo, ComparablePojo> tuple2, Tuple2<ComparablePojo, ComparablePojo> tuple22) {
                return ((ComparablePojo) tuple2.f0).compareTo((ComparablePojo) tuple22.f1);
            }
        });
        ComparablePojo comparablePojo = null;
        for (Tuple2 tuple2 : collect) {
            Assert.assertTrue("Min element in each partition should be smaller than max.", ((ComparablePojo) tuple2.f0).compareTo((ComparablePojo) tuple2.f1) <= 0);
            if (comparablePojo == null) {
                comparablePojo = (ComparablePojo) tuple2.f1;
            } else {
                Assert.assertTrue("Partitions overlap. Previous max should be smaller than current min.", comparablePojo.compareTo((ComparablePojo) tuple2.f0) < 0);
                if (comparablePojo.first.equals(((ComparablePojo) tuple2.f0).first)) {
                    Assert.assertEquals("Ordering on the second field should be continous.", comparablePojo.second.longValue() - 1, ((ComparablePojo) tuple2.f0).second.longValue());
                }
                comparablePojo = (ComparablePojo) tuple2.f1;
            }
        }
    }
}
