package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.common.typeutils.record.RecordComparator;
import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.class */
public class CombiningUnilateralSortMergerITCase {
    private static final Logger LOG = LoggerFactory.getLogger(CombiningUnilateralSortMergerITCase.class);
    private static final long SEED = 649180756312423613L;
    private static final int KEY_MAX = 1000;
    private static final int VALUE_LENGTH = 118;
    private static final int NUM_PAIRS = 50000;
    public static final int MEMORY_SIZE = 268435456;
    private final AbstractInvokable parentTask = new DummyInvokable();
    private IOManager ioManager;
    private MemoryManager memoryManager;
    private TypeSerializerFactory<Record> serializerFactory;
    private TypeComparator<Record> comparator;

    /* loaded from: input_file:org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase$TestCountCombiner.class */
    public static class TestCountCombiner extends RichGroupReduceFunction<Record, Record> {
        private static final long serialVersionUID = 1;
        private final IntValue count = new IntValue();
        public volatile boolean opened = false;
        public volatile boolean closed = false;

        @Override // org.apache.flink.api.common.functions.RichGroupReduceFunction, org.apache.flink.api.common.functions.GroupCombineFunction
        public void combine(Iterable<Record> iterable, Collector<Record> collector) {
            Record record = null;
            int i = 0;
            Iterator<Record> it = iterable.iterator();
            while (it.hasNext()) {
                record = it.next();
                i += ((IntValue) record.getField(1, IntValue.class)).getValue();
            }
            this.count.setValue(i);
            record.setField(1, this.count);
            collector.collect(record);
        }

        @Override // org.apache.flink.api.common.functions.RichGroupReduceFunction, org.apache.flink.api.common.functions.GroupReduceFunction
        public void reduce(Iterable<Record> iterable, Collector<Record> collector) {
        }

        @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
        public void open(Configuration configuration) throws Exception {
            this.opened = true;
        }

        @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
        public void close() throws Exception {
            this.closed = true;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase$TestCountCombiner2.class */
    public static class TestCountCombiner2 extends RichGroupReduceFunction<Record, Record> {
        private static final long serialVersionUID = 1;
        public volatile boolean opened = false;
        public volatile boolean closed = false;

        @Override // org.apache.flink.api.common.functions.RichGroupReduceFunction, org.apache.flink.api.common.functions.GroupCombineFunction
        public void combine(Iterable<Record> iterable, Collector<Record> collector) {
            Record record = null;
            int i = 0;
            Iterator<Record> it = iterable.iterator();
            while (it.hasNext()) {
                record = it.next();
                i += Integer.parseInt(((TestData.Value) record.getField(1, TestData.Value.class)).toString());
            }
            collector.collect(new Record(record.getField(0, TestData.Key.class), new TestData.Value(i + "")));
        }

        @Override // org.apache.flink.api.common.functions.RichGroupReduceFunction, org.apache.flink.api.common.functions.GroupReduceFunction
        public void reduce(Iterable<Record> iterable, Collector<Record> collector) {
        }

        @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
        public void open(Configuration configuration) throws Exception {
            this.opened = true;
        }

        @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
        public void close() throws Exception {
            this.closed = true;
        }
    }

    @Before
    public void beforeTest() {
        this.memoryManager = new DefaultMemoryManager(268435456L, 1);
        this.ioManager = new IOManagerAsync();
        this.serializerFactory = RecordSerializerFactory.get();
        this.comparator = new RecordComparator(new int[]{0}, new Class[]{TestData.Key.class});
    }

    @After
    public void afterTest() {
        this.ioManager.shutdown();
        if (!this.ioManager.isProperlyShutDown()) {
            Assert.fail("I/O Manager was not properly shut down.");
        }
        if (this.memoryManager != null) {
            Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", this.memoryManager.verifyEmpty());
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    @Test
    public void testCombine() throws Exception {
        MockRecordReader mockRecordReader = new MockRecordReader();
        LOG.debug("initializing sortmerger");
        TestCountCombiner testCountCombiner = new TestCountCombiner();
        CombiningUnilateralSortMerger combiningUnilateralSortMerger = new CombiningUnilateralSortMerger(testCountCombiner, this.memoryManager, this.ioManager, mockRecordReader, this.parentTask, this.serializerFactory, this.comparator, 0.25d, 64, 0.7f);
        Record record = new Record();
        record.setField(1, new IntValue(1));
        TestData.Key key = new TestData.Key();
        for (int i = 0; i < 10000; i++) {
            for (int i2 = 0; i2 < 100; i2++) {
                key.setKey(i2);
                record.setField(0, key);
                mockRecordReader.emit(record);
            }
        }
        mockRecordReader.close();
        Iterator<Integer> reducingIterator = getReducingIterator(combiningUnilateralSortMerger.getIterator(), this.serializerFactory.getSerializer2(), this.comparator.duplicate2());
        while (reducingIterator.hasNext()) {
            Assert.assertEquals(10000, reducingIterator.next().intValue());
        }
        combiningUnilateralSortMerger.close();
        Assert.assertTrue(testCountCombiner.opened == testCountCombiner.closed);
    }

    @Test
    public void testCombineSpilling() throws Exception {
        MockRecordReader mockRecordReader = new MockRecordReader();
        LOG.debug("initializing sortmerger");
        TestCountCombiner testCountCombiner = new TestCountCombiner();
        CombiningUnilateralSortMerger combiningUnilateralSortMerger = new CombiningUnilateralSortMerger(testCountCombiner, this.memoryManager, this.ioManager, mockRecordReader, this.parentTask, this.serializerFactory, this.comparator, 0.01d, 64, 0.005f);
        Record record = new Record();
        record.setField(1, new IntValue(1));
        TestData.Key key = new TestData.Key();
        for (int i = 0; i < 10000; i++) {
            for (int i2 = 0; i2 < 100; i2++) {
                key.setKey(i2);
                record.setField(0, key);
                mockRecordReader.emit(record);
            }
        }
        mockRecordReader.close();
        Iterator<Integer> reducingIterator = getReducingIterator(combiningUnilateralSortMerger.getIterator(), this.serializerFactory.getSerializer2(), this.comparator.duplicate2());
        while (reducingIterator.hasNext()) {
            Assert.assertEquals(10000, reducingIterator.next().intValue());
        }
        combiningUnilateralSortMerger.close();
        Assert.assertTrue(testCountCombiner.opened == testCountCombiner.closed);
    }

    @Test
    public void testSortAndValidate() throws Exception {
        Hashtable hashtable = new Hashtable(1000);
        for (int i = 1; i <= 1000; i++) {
            hashtable.put(new TestData.Key(i), 0);
        }
        TestData.KeyComparator keyComparator = new TestData.KeyComparator();
        MockRecordReader mockRecordReader = new MockRecordReader();
        LOG.debug("initializing sortmerger");
        TestCountCombiner2 testCountCombiner2 = new TestCountCombiner2();
        CombiningUnilateralSortMerger combiningUnilateralSortMerger = new CombiningUnilateralSortMerger(testCountCombiner2, this.memoryManager, this.ioManager, mockRecordReader, this.parentTask, this.serializerFactory, this.comparator, 0.25d, 2, 0.7f);
        LOG.debug("emitting data");
        TestData.Generator generator = new TestData.Generator(SEED, 1000, 118, TestData.Generator.KeyMode.RANDOM, TestData.Generator.ValueMode.FIX_LENGTH);
        Record record = new Record();
        TestData.Value value = new TestData.Value("1");
        for (int i2 = 0; i2 < 50000; i2++) {
            Record next = generator.next(record);
            record = next;
            Assert.assertTrue(next != null);
            TestData.Key key = (TestData.Key) record.getField(0, TestData.Key.class);
            record.setField(1, value);
            mockRecordReader.emit(record);
            hashtable.put(new TestData.Key(key.getKey()), Integer.valueOf(((Integer) hashtable.get(key)).intValue() + 1));
        }
        mockRecordReader.close();
        MutableObjectIterator<E> iterator = combiningUnilateralSortMerger.getIterator();
        LOG.debug("checking results");
        Record record2 = new Record();
        Record record3 = new Record();
        Record record4 = (Record) iterator.next(record2);
        Record record5 = record4;
        Assert.assertTrue(record4 != null);
        hashtable.put(new TestData.Key(((TestData.Key) record5.getField(0, TestData.Key.class)).getKey()), Integer.valueOf(((Integer) hashtable.get(record5.getField(0, TestData.Key.class))).intValue() - Integer.parseInt(((TestData.Value) record5.getField(1, TestData.Value.class)).toString())));
        while (true) {
            Record record6 = (Record) iterator.next(record3);
            if (record6 == null) {
                break;
            }
            TestData.Key key2 = (TestData.Key) record5.getField(0, TestData.Key.class);
            TestData.Key key3 = (TestData.Key) record6.getField(0, TestData.Key.class);
            Assert.assertTrue(keyComparator.compare(key2, key3) <= 0);
            hashtable.put(new TestData.Key(key3.getKey()), Integer.valueOf(((Integer) hashtable.get(key3)).intValue() - Integer.parseInt(((TestData.Value) record6.getField(1, TestData.Value.class)).toString())));
            Record record7 = record5;
            record5 = record6;
            key2.setKey(key3.getKey());
            record3 = record7;
        }
        Iterator it = hashtable.values().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((Integer) it.next()).intValue() == 0);
        }
        combiningUnilateralSortMerger.close();
        Assert.assertTrue(testCountCombiner2.opened == testCountCombiner2.closed);
    }

    private static Iterator<Integer> getReducingIterator(MutableObjectIterator<Record> mutableObjectIterator, TypeSerializer<Record> typeSerializer, TypeComparator<Record> typeComparator) {
        final ReusingKeyGroupedIterator reusingKeyGroupedIterator = new ReusingKeyGroupedIterator(mutableObjectIterator, typeSerializer, typeComparator);
        return new Iterator<Integer>() { // from class: org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMergerITCase.1
            private boolean hasNext = false;

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.hasNext) {
                    return true;
                }
                try {
                    this.hasNext = ReusingKeyGroupedIterator.this.nextKey();
                    return this.hasNext;
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

            /* JADX WARN: Can't rename method to resolve collision */
            /* JADX WARN: Multi-variable type inference failed */
            @Override // java.util.Iterator
            public Integer next() {
                if (!hasNext()) {
                    throw new NoSuchElementException();
                }
                this.hasNext = false;
                ReusingKeyGroupedIterator.ValuesIterator values = ReusingKeyGroupedIterator.this.getValues();
                int i = 0;
                while (true) {
                    int i2 = i;
                    if (!values.hasNext()) {
                        return Integer.valueOf(i2);
                    }
                    i = i2 + ((IntValue) ((Record) values.next()).getField(1, IntValue.class)).getValue();
                }
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }
}
