/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.sort;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.BinaryRowWriter;
import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter;
import org.apache.flink.table.runtime.operators.sort.IntNormalizedKeyComputer;
import org.apache.flink.table.runtime.operators.sort.IntRecordComparator;
import org.apache.flink.table.runtime.typeutils.AbstractRowSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class BinaryExternalSorterTest {
    private static final int MEMORY_SIZE = 0x2000000;
    private static final Logger LOG = LoggerFactory.getLogger(BinaryExternalSorterTest.class);
    private IOManager ioManager = new IOManagerAsync();
    private MemoryManager memoryManager;
    private BinaryRowSerializer serializer;
    private Configuration conf = new Configuration();

    public BinaryExternalSorterTest(boolean spillCompress, boolean asyncMerge) {
        if (!spillCompress) {
            this.conf.setBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, false);
        }
        if (asyncMerge) {
            this.conf.setBoolean(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED, true);
        }
    }

    @Parameterized.Parameters(name="spillCompress-{0} asyncMerge-{1}")
    public static Collection<Boolean[]> parameters() {
        return Arrays.asList({false, false}, {false, true}, {true, false}, {true, true});
    }

    private static String getString(int count) {
        StringBuilder builder = new StringBuilder();
        for (int i = 0; i < 8; ++i) {
            builder.append(count);
        }
        return builder.toString();
    }

    @Before
    public void beforeTest() {
        this.memoryManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x2000000L).build();
        this.serializer = new BinaryRowSerializer(2);
        this.conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES, 128);
    }

    @After
    public void afterTest() throws Exception {
        this.ioManager.close();
        if (this.memoryManager != null) {
            Assert.assertTrue((String)"Memory leak: not all segments have been returned to the memory manager.", (boolean)this.memoryManager.verifyEmpty());
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    @Test
    public void testSortTwoBufferInMemory() throws Exception {
        int size = 1000000;
        MockBinaryRowReader reader = new MockBinaryRowReader(size);
        LOG.debug("initializing sortmerger");
        MemoryManager memoryManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x6500000L).build();
        long minMemorySize = memoryManager.computeNumberOfPages(1.0) * 32768;
        BinaryExternalSorter sorter = new BinaryExternalSorter(new Object(), memoryManager, minMemorySize, this.ioManager, (AbstractRowSerializer)this.serializer, this.serializer, (NormalizedKeyComputer)IntNormalizedKeyComputer.INSTANCE, (RecordComparator)IntRecordComparator.INSTANCE, this.conf, 1.0f);
        sorter.startThreads();
        sorter.write((MutableObjectIterator)reader);
        MutableObjectIterator iterator = sorter.getIterator();
        BinaryRow next = this.serializer.createInstance();
        for (int i = 0; i < size; ++i) {
            next = (BinaryRow)iterator.next((Object)next);
            Assert.assertEquals((long)i, (long)next.getInt(0));
            Assert.assertEquals((Object)BinaryExternalSorterTest.getString(i), (Object)next.getString(1).toString());
        }
        sorter.close();
        Assert.assertTrue((boolean)memoryManager.verifyEmpty());
        memoryManager.shutdown();
    }

    @Test
    public void testSort() throws Exception {
        int size = 10000;
        MockBinaryRowReader reader = new MockBinaryRowReader(size);
        LOG.debug("initializing sortmerger");
        long minMemorySize = this.memoryManager.computeNumberOfPages(0.9) * 32768;
        BinaryExternalSorter sorter = new BinaryExternalSorter(new Object(), this.memoryManager, minMemorySize, this.ioManager, (AbstractRowSerializer)this.serializer, this.serializer, (NormalizedKeyComputer)IntNormalizedKeyComputer.INSTANCE, (RecordComparator)IntRecordComparator.INSTANCE, this.conf, 0.7f);
        sorter.startThreads();
        sorter.write((MutableObjectIterator)reader);
        MutableObjectIterator iterator = sorter.getIterator();
        BinaryRow next = this.serializer.createInstance();
        for (int i = 0; i < size; ++i) {
            next = (BinaryRow)iterator.next((Object)next);
            Assert.assertEquals((long)i, (long)next.getInt(0));
            Assert.assertEquals((Object)BinaryExternalSorterTest.getString(i), (Object)next.getString(1).toString());
        }
        sorter.close();
    }

    @Test
    public void testSortIntStringWithRepeat() throws Exception {
        int size = 10000;
        LOG.debug("initializing sortmerger");
        long minMemorySize = this.memoryManager.computeNumberOfPages(0.9) * 32768;
        BinaryExternalSorter sorter = new BinaryExternalSorter(new Object(), this.memoryManager, minMemorySize, this.ioManager, (AbstractRowSerializer)this.serializer, this.serializer, (NormalizedKeyComputer)new IntNormalizedKeyComputer(){

            @Override
            public boolean isKeyFullyDetermines() {
                return false;
            }
        }, (RecordComparator)IntRecordComparator.INSTANCE, this.conf, 0.7f);
        sorter.startThreads();
        sorter.write((MutableObjectIterator)new MockBinaryRowReader(size));
        sorter.write((MutableObjectIterator)new MockBinaryRowReader(size));
        sorter.write((MutableObjectIterator)new MockBinaryRowReader(size));
        MutableObjectIterator iterator = sorter.getIterator();
        BinaryRow next = this.serializer.createInstance();
        for (int i = 0; i < size; ++i) {
            for (int j = 0; j < 3; ++j) {
                next = (BinaryRow)iterator.next((Object)next);
                Assert.assertEquals((long)i, (long)next.getInt(0));
                Assert.assertEquals((Object)BinaryExternalSorterTest.getString(i), (Object)next.getString(1).toString());
            }
        }
        sorter.close();
    }

    @Test
    public void testSpilling() throws Exception {
        int size = 1000000;
        MockBinaryRowReader reader = new MockBinaryRowReader(size);
        LOG.debug("initializing sortmerger");
        long minMemorySize = this.memoryManager.computeNumberOfPages(0.1) * 32768;
        BinaryExternalSorter sorter = new BinaryExternalSorter(new Object(), this.memoryManager, minMemorySize, this.ioManager, (AbstractRowSerializer)this.serializer, this.serializer, (NormalizedKeyComputer)IntNormalizedKeyComputer.INSTANCE, (RecordComparator)IntRecordComparator.INSTANCE, this.conf, 0.7f);
        sorter.startThreads();
        sorter.write((MutableObjectIterator)reader);
        MutableObjectIterator iterator = sorter.getIterator();
        BinaryRow next = this.serializer.createInstance();
        for (int i = 0; i < size; ++i) {
            next = (BinaryRow)iterator.next((Object)next);
            Assert.assertEquals((long)i, (long)next.getInt(0));
            Assert.assertEquals((Object)BinaryExternalSorterTest.getString(i), (Object)next.getString(1).toString());
        }
        sorter.close();
    }

    @Test
    public void testSpillingDesc() throws Exception {
        int size = 1000000;
        MockBinaryRowReader reader = new MockBinaryRowReader(size);
        LOG.debug("initializing sortmerger");
        long minMemorySize = this.memoryManager.computeNumberOfPages(0.1) * 32768;
        BinaryExternalSorter sorter = new BinaryExternalSorter(new Object(), this.memoryManager, minMemorySize, this.ioManager, (AbstractRowSerializer)this.serializer, this.serializer, (NormalizedKeyComputer)new IntNormalizedKeyComputer(){

            @Override
            public boolean invertKey() {
                return true;
            }
        }, (RecordComparator)new IntRecordComparator(){

            @Override
            public int compare(BaseRow o1, BaseRow o2) {
                return -super.compare(o1, o2);
            }
        }, this.conf, 0.7f);
        sorter.startThreads();
        sorter.write((MutableObjectIterator)reader);
        MutableObjectIterator iterator = sorter.getIterator();
        ArrayList<Tuple2> data = new ArrayList<Tuple2>();
        for (int i = 0; i < size; ++i) {
            data.add(new Tuple2((Object)i, (Object)BinaryExternalSorterTest.getString(i)));
        }
        data.sort((o1, o2) -> -((Integer)o1.f0).compareTo((Integer)o2.f0));
        BinaryRow next = this.serializer.createInstance();
        for (int i = 0; i < size; ++i) {
            next = (BinaryRow)iterator.next((Object)next);
            Assert.assertEquals((long)((Integer)((Tuple2)data.get((int)i)).f0).intValue(), (long)next.getInt(0));
            Assert.assertEquals((Object)((Tuple2)data.get((int)i)).f1, (Object)next.getString(1).toString());
        }
        sorter.close();
    }

    @Test
    public void testMergeManyTimes() throws Exception {
        int size = 1000000;
        MockBinaryRowReader reader = new MockBinaryRowReader(size);
        LOG.debug("initializing sortmerger");
        long minMemorySize = this.memoryManager.computeNumberOfPages(0.01) * 32768;
        this.conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES, 8);
        BinaryExternalSorter sorter = new BinaryExternalSorter(new Object(), this.memoryManager, minMemorySize, this.ioManager, (AbstractRowSerializer)this.serializer, this.serializer, (NormalizedKeyComputer)IntNormalizedKeyComputer.INSTANCE, (RecordComparator)IntRecordComparator.INSTANCE, this.conf, 0.7f);
        sorter.startThreads();
        sorter.write((MutableObjectIterator)reader);
        MutableObjectIterator iterator = sorter.getIterator();
        BinaryRow next = this.serializer.createInstance();
        for (int i = 0; i < size; ++i) {
            next = (BinaryRow)iterator.next((Object)next);
            Assert.assertEquals((long)i, (long)next.getInt(0));
            Assert.assertEquals((Object)BinaryExternalSorterTest.getString(i), (Object)next.getString(1).toString());
        }
        sorter.close();
    }

    @Test
    public void testSpillingRandom() throws Exception {
        int i;
        int size = 1000000;
        MockBinaryRowReader reader = new MockBinaryRowReader(size);
        LOG.debug("initializing sortmerger");
        long minMemorySize = this.memoryManager.computeNumberOfPages(0.1) * 32768;
        BinaryExternalSorter sorter = new BinaryExternalSorter(new Object(), this.memoryManager, minMemorySize, this.ioManager, (AbstractRowSerializer)this.serializer, this.serializer, (NormalizedKeyComputer)IntNormalizedKeyComputer.INSTANCE, (RecordComparator)IntRecordComparator.INSTANCE, this.conf, 0.7f);
        sorter.startThreads();
        ArrayList<BinaryRow> data = new ArrayList<BinaryRow>();
        BinaryRow row = this.serializer.createInstance();
        for (i = 0; i < size; ++i) {
            row = reader.next(row);
            data.add(row.copy());
        }
        Collections.shuffle(data);
        for (i = 0; i < size; ++i) {
            sorter.write((BaseRow)data.get(i));
        }
        MutableObjectIterator iterator = sorter.getIterator();
        data.sort(Comparator.comparingInt(o -> o.getInt(0)));
        BinaryRow next = this.serializer.createInstance();
        for (int i2 = 0; i2 < size; ++i2) {
            next = (BinaryRow)iterator.next((Object)next);
            Assert.assertEquals((long)((BinaryRow)data.get(i2)).getInt(0), (long)next.getInt(0));
            Assert.assertEquals((Object)((BinaryRow)data.get(i2)).getString(1), (Object)next.getString(1));
        }
        sorter.close();
    }

    public class MockBinaryRowReader
    implements MutableObjectIterator<BinaryRow> {
        private int size;
        private int count;
        private BinaryRow row;
        private BinaryRowWriter writer;

        public MockBinaryRowReader(int size) {
            this.size = size;
            this.row = new BinaryRow(2);
            this.writer = new BinaryRowWriter(this.row);
        }

        public BinaryRow next(BinaryRow reuse) {
            return this.next();
        }

        public BinaryRow next() {
            if (this.count >= this.size) {
                return null;
            }
            this.writer.reset();
            this.writer.writeInt(0, this.count);
            this.writer.writeString(1, BinaryString.fromString((String)BinaryExternalSorterTest.getString(this.count)));
            this.writer.complete();
            ++this.count;
            return this.row;
        }
    }
}

