package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.class */
public class CombiningUnilateralSortMergerITCase {
    private static final Logger LOG = LoggerFactory.getLogger(CombiningUnilateralSortMergerITCase.class);
    private static final long SEED = 649180756312423613L;
    private static final int KEY_MAX = 1000;
    private static final int VALUE_LENGTH = 118;
    private static final int NUM_PAIRS = 50000;
    public static final int MEMORY_SIZE = 268435456;
    private final AbstractInvokable parentTask = new DummyInvokable();
    private IOManager ioManager;
    private MemoryManager memoryManager;
    private TypeSerializerFactory<Tuple2<Integer, String>> serializerFactory1;
    private TypeSerializerFactory<Tuple2<Integer, Integer>> serializerFactory2;
    private TypeComparator<Tuple2<Integer, String>> comparator1;
    private TypeComparator<Tuple2<Integer, Integer>> comparator2;

    /* loaded from: input_file:org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase$TestCountCombiner.class */
    public static class TestCountCombiner extends RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> implements GroupCombineFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1;
        private Integer count = 0;
        public volatile boolean opened = false;
        public volatile boolean closed = false;

        public void combine(Iterable<Tuple2<Integer, Integer>> iterable, Collector<Tuple2<Integer, Integer>> collector) {
            Tuple2<Integer, Integer> tuple2 = new Tuple2<>();
            int i = 0;
            Iterator<Tuple2<Integer, Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                tuple2 = it.next();
                i += ((Integer) tuple2.f1).intValue();
            }
            this.count = Integer.valueOf(i);
            tuple2.setField(this.count, 1);
            collector.collect(tuple2);
        }

        public void reduce(Iterable<Tuple2<Integer, Integer>> iterable, Collector<Tuple2<Integer, Integer>> collector) {
        }

        public void open(Configuration configuration) throws Exception {
            this.opened = true;
        }

        public void close() throws Exception {
            this.closed = true;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase$TestCountCombiner2.class */
    public static class TestCountCombiner2 extends RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> implements GroupCombineFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
        private static final long serialVersionUID = 1;
        public volatile boolean opened = false;
        public volatile boolean closed = false;

        public void combine(Iterable<Tuple2<Integer, String>> iterable, Collector<Tuple2<Integer, String>> collector) {
            Tuple2<Integer, String> tuple2 = new Tuple2<>();
            int i = 0;
            Iterator<Tuple2<Integer, String>> it = iterable.iterator();
            while (it.hasNext()) {
                tuple2 = it.next();
                i += Integer.parseInt((String) tuple2.f1);
            }
            collector.collect(new Tuple2(tuple2.f0, i + ""));
        }

        public void reduce(Iterable<Tuple2<Integer, String>> iterable, Collector<Tuple2<Integer, String>> collector) {
        }

        public void open(Configuration configuration) throws Exception {
            this.opened = true;
        }

        public void close() throws Exception {
            this.closed = true;
        }
    }

    @Before
    public void beforeTest() {
        this.memoryManager = new MemoryManager(268435456L, 1);
        this.ioManager = new IOManagerAsync();
        this.serializerFactory1 = TestData.getIntStringTupleSerializerFactory();
        this.comparator1 = TestData.getIntStringTupleComparator();
        this.serializerFactory2 = TestData.getIntIntTupleSerializerFactory();
        this.comparator2 = TestData.getIntIntTupleComparator();
    }

    @After
    public void afterTest() {
        this.ioManager.shutdown();
        if (!this.ioManager.isProperlyShutDown()) {
            Assert.fail("I/O Manager was not properly shut down.");
        }
        if (this.memoryManager != null) {
            Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", this.memoryManager.verifyEmpty());
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    @Test
    public void testCombine() throws Exception {
        TestData.MockTuple2Reader<Tuple2<Integer, Integer>> intIntTupleReader = TestData.getIntIntTupleReader();
        LOG.debug("initializing sortmerger");
        TestCountCombiner testCountCombiner = new TestCountCombiner();
        CombiningUnilateralSortMerger combiningUnilateralSortMerger = new CombiningUnilateralSortMerger(testCountCombiner, this.memoryManager, this.ioManager, intIntTupleReader, this.parentTask, this.serializerFactory2, this.comparator2, 0.25d, 64, 0.7f, false);
        Tuple2 tuple2 = new Tuple2();
        tuple2.setField(1, 1);
        for (int i = 0; i < 10000; i++) {
            for (int i2 = 0; i2 < 100; i2++) {
                tuple2.setField(Integer.valueOf(i2), 0);
                intIntTupleReader.emit(tuple2);
            }
        }
        intIntTupleReader.close();
        Iterator<Integer> reducingIterator = getReducingIterator(combiningUnilateralSortMerger.getIterator(), this.serializerFactory2.getSerializer(), this.comparator2.duplicate());
        while (reducingIterator.hasNext()) {
            Assert.assertEquals(10000, reducingIterator.next().intValue());
        }
        combiningUnilateralSortMerger.close();
        Assert.assertTrue(testCountCombiner.opened == testCountCombiner.closed);
    }

    @Test
    public void testCombineSpilling() throws Exception {
        TestData.MockTuple2Reader<Tuple2<Integer, Integer>> intIntTupleReader = TestData.getIntIntTupleReader();
        LOG.debug("initializing sortmerger");
        TestCountCombiner testCountCombiner = new TestCountCombiner();
        CombiningUnilateralSortMerger combiningUnilateralSortMerger = new CombiningUnilateralSortMerger(testCountCombiner, this.memoryManager, this.ioManager, intIntTupleReader, this.parentTask, this.serializerFactory2, this.comparator2, 0.01d, 64, 0.005f, true);
        Tuple2 tuple2 = new Tuple2();
        tuple2.setField(1, 1);
        for (int i = 0; i < 10000; i++) {
            for (int i2 = 0; i2 < 100; i2++) {
                tuple2.setField(Integer.valueOf(i2), 0);
                intIntTupleReader.emit(tuple2);
            }
        }
        intIntTupleReader.close();
        Iterator<Integer> reducingIterator = getReducingIterator(combiningUnilateralSortMerger.getIterator(), this.serializerFactory2.getSerializer(), this.comparator2.duplicate());
        while (reducingIterator.hasNext()) {
            Assert.assertEquals(10000, reducingIterator.next().intValue());
        }
        combiningUnilateralSortMerger.close();
        Assert.assertTrue(testCountCombiner.opened == testCountCombiner.closed);
    }

    @Test
    public void testSortAndValidate() throws Exception {
        Hashtable hashtable = new Hashtable(KEY_MAX);
        for (int i = 1; i <= KEY_MAX; i++) {
            hashtable.put(Integer.valueOf(i), 0);
        }
        IntComparator intComparator = new IntComparator(true);
        TestData.MockTuple2Reader<Tuple2<Integer, String>> intStringTupleReader = TestData.getIntStringTupleReader();
        LOG.debug("initializing sortmerger");
        TestCountCombiner2 testCountCombiner2 = new TestCountCombiner2();
        CombiningUnilateralSortMerger combiningUnilateralSortMerger = new CombiningUnilateralSortMerger(testCountCombiner2, this.memoryManager, this.ioManager, intStringTupleReader, this.parentTask, this.serializerFactory1, this.comparator1, 0.25d, 2, 0.7f, false);
        LOG.debug("emitting data");
        TestData.TupleGenerator tupleGenerator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.FIX_LENGTH);
        Tuple2<Integer, String> tuple2 = new Tuple2<>();
        for (int i2 = 0; i2 < NUM_PAIRS; i2++) {
            Tuple2<Integer, String> next = tupleGenerator.next(tuple2);
            tuple2 = next;
            Assert.assertTrue(next != null);
            Integer num = (Integer) tuple2.f0;
            tuple2.setField("1", 1);
            intStringTupleReader.emit(tuple2);
            hashtable.put(num, Integer.valueOf(((Integer) hashtable.get(num)).intValue() + 1));
        }
        intStringTupleReader.close();
        MutableObjectIterator iterator = combiningUnilateralSortMerger.getIterator();
        LOG.debug("checking results");
        Tuple2 tuple22 = new Tuple2();
        Tuple2 tuple23 = new Tuple2();
        Tuple2 tuple24 = (Tuple2) iterator.next(tuple22);
        Tuple2 tuple25 = tuple24;
        Assert.assertTrue(tuple24 != null);
        hashtable.put(tuple25.f0, Integer.valueOf(((Integer) hashtable.get(tuple25.f0)).intValue() - Integer.parseInt((String) tuple25.f1)));
        while (true) {
            Tuple2 tuple26 = (Tuple2) iterator.next(tuple23);
            tuple23 = tuple26;
            if (tuple26 == null) {
                break;
            }
            int intValue = ((Integer) tuple25.f0).intValue();
            int intValue2 = ((Integer) tuple23.f0).intValue();
            Assert.assertTrue(intComparator.compare(Integer.valueOf(intValue), Integer.valueOf(intValue2)) <= 0);
            hashtable.put(Integer.valueOf(intValue2), Integer.valueOf(((Integer) hashtable.get(Integer.valueOf(intValue2))).intValue() - Integer.parseInt((String) tuple23.f1)));
            tuple25 = tuple23;
        }
        Iterator it = hashtable.values().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((Integer) it.next()).intValue() == 0);
        }
        combiningUnilateralSortMerger.close();
        Assert.assertTrue(testCountCombiner2.opened == testCountCombiner2.closed);
    }

    private static Iterator<Integer> getReducingIterator(MutableObjectIterator<Tuple2<Integer, Integer>> mutableObjectIterator, TypeSerializer<Tuple2<Integer, Integer>> typeSerializer, TypeComparator<Tuple2<Integer, Integer>> typeComparator) {
        final ReusingKeyGroupedIterator reusingKeyGroupedIterator = new ReusingKeyGroupedIterator(mutableObjectIterator, typeSerializer, typeComparator);
        return new Iterator<Integer>() { // from class: org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMergerITCase.1
            private boolean hasNext = false;

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.hasNext) {
                    return true;
                }
                try {
                    this.hasNext = reusingKeyGroupedIterator.nextKey();
                    return this.hasNext;
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public Integer next() {
                if (!hasNext()) {
                    throw new NoSuchElementException();
                }
                this.hasNext = false;
                ReusingKeyGroupedIterator.ValuesIterator values = reusingKeyGroupedIterator.getValues();
                int i = 0;
                while (true) {
                    int i2 = i;
                    if (!values.hasNext()) {
                        return Integer.valueOf(i2);
                    }
                    i = i2 + ((Integer) ((Tuple2) values.next()).f1).intValue();
                }
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }
}
