package org.apache.hugegraph.computer.core.sort.sorter;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.hugegraph.computer.core.combiner.IntValueSumCombiner;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.value.IntValue;
import org.apache.hugegraph.computer.core.io.BytesOutput;
import org.apache.hugegraph.computer.core.io.IOFactory;
import org.apache.hugegraph.computer.core.io.RandomAccessInput;
import org.apache.hugegraph.computer.core.sort.Sorter;
import org.apache.hugegraph.computer.core.sort.SorterTestUtil;
import org.apache.hugegraph.computer.core.sort.flusher.CombineKvInnerSortFlusher;
import org.apache.hugegraph.computer.core.sort.flusher.CombineKvOuterSortFlusher;
import org.apache.hugegraph.computer.core.sort.flusher.KvOuterSortFlusher;
import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator;
import org.apache.hugegraph.computer.core.store.StoreTestUtil;
import org.apache.hugegraph.computer.core.store.entry.DefaultKvEntry;
import org.apache.hugegraph.computer.core.store.entry.EntriesUtil;
import org.apache.hugegraph.computer.core.store.entry.InlinePointer;
import org.apache.hugegraph.computer.core.store.entry.KvEntry;
import org.apache.hugegraph.computer.core.store.file.hgkvfile.HgkvDirImpl;
import org.apache.hugegraph.computer.core.store.file.hgkvfile.builder.HgkvDirBuilderImpl;
import org.apache.hugegraph.computer.suite.unit.UnitTestBase;
import org.apache.hugegraph.testutil.Assert;
import org.apache.hugegraph.util.Log;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/computer/core/sort/sorter/SortLargeDataTest.class */
public class SortLargeDataTest {
    private static final Logger LOG = Log.logger(SortLargeDataTest.class);
    private static Config CONFIG;

    @BeforeClass
    public static void init() {
        CONFIG = UnitTestBase.updateWithRequiredOptions(ComputerOptions.HGKV_MERGE_FILES_NUM, "200", ComputerOptions.HGKV_MAX_FILE_SIZE, String.valueOf(1073741824L), ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false");
    }

    @Before
    public void setup() throws IOException {
        FileUtils.deleteDirectory(new File(StoreTestUtil.FILE_DIR));
    }

    @After
    public void teardown() throws IOException {
        FileUtils.deleteDirectory(new File(StoreTestUtil.FILE_DIR));
    }

    @Test
    public void testAllProcess() throws Exception {
        StopWatch stopWatch = new StopWatch();
        long j = 0;
        Random random = new Random();
        BytesOutput createBytesOutput = IOFactory.createBytesOutput(32);
        ArrayList arrayList = new ArrayList(300);
        ArrayList arrayList2 = new ArrayList();
        int i = 10;
        Sorter createSorter = SorterTestUtil.createSorter(CONFIG);
        stopWatch.start();
        for (int i2 = 0; i2 < 1000000; i2++) {
            SorterTestUtil.writeData(createBytesOutput, Integer.valueOf(random.nextInt(1000000)));
            int nextInt = random.nextInt(5);
            SorterTestUtil.writeData(createBytesOutput, Integer.valueOf(nextInt));
            j += nextInt;
            if (createBytesOutput.position() >= 1048576 || i2 + 1 == 1000000) {
                arrayList.add(sortBuffer(createSorter, EntriesUtil.inputFromOutput(createBytesOutput)));
                createBytesOutput.seek(0L);
            }
            if (arrayList.size() >= 300 || i2 + 1 == 1000000) {
                int i3 = i;
                i++;
                String availablePathById = StoreTestUtil.availablePathById(i3);
                arrayList2.add(availablePathById);
                mergeBuffers(createSorter, arrayList, availablePathById);
                arrayList.clear();
            }
        }
        String availablePathById2 = StoreTestUtil.availablePathById("0");
        mergeFiles(createSorter, arrayList2, Lists.newArrayList(new String[]{availablePathById2}));
        stopWatch.stop();
        LOG.info("testAllProcess sort time: {}", Long.valueOf(stopWatch.getTime()));
        Assert.assertEquals(j, sumOfEntryValue(createSorter, ImmutableList.of(availablePathById2)));
    }

    @Test
    public void testMergeBuffers() throws Exception {
        StopWatch stopWatch = new StopWatch();
        long j = 0;
        Random random = new Random();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2000; i++) {
            BytesOutput createBytesOutput = IOFactory.createBytesOutput(32);
            while (createBytesOutput.position() < 51200) {
                SorterTestUtil.writeData(createBytesOutput, Integer.valueOf(random.nextInt(10000000)));
                int nextInt = random.nextInt(100);
                SorterTestUtil.writeData(createBytesOutput, Integer.valueOf(nextInt));
                j += nextInt;
            }
            arrayList.add(EntriesUtil.inputFromOutput(createBytesOutput));
        }
        Sorter createSorter = SorterTestUtil.createSorter(CONFIG);
        stopWatch.start();
        ArrayList arrayList2 = new ArrayList();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            arrayList2.add(sortBuffer(createSorter, (RandomAccessInput) it.next()));
        }
        stopWatch.stop();
        LOG.info("testMergeBuffers sort buffer cost time: {}", Long.valueOf(stopWatch.getTime()));
        String availablePathById = StoreTestUtil.availablePathById("0");
        stopWatch.reset();
        stopWatch.start();
        createSorter.mergeBuffers(arrayList2, new KvOuterSortFlusher(), availablePathById, false);
        stopWatch.stop();
        LOG.info("testMergeBuffers merge buffers cost time: {}", Long.valueOf(stopWatch.getTime()));
        Assert.assertEquals(j, sumOfEntryValue(createSorter, ImmutableList.of(availablePathById)));
        assertFileOrder(createSorter, ImmutableList.of(availablePathById));
    }

    @Test
    public void testMergeBuffersAllSameKey() throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 1000; i++) {
            BytesOutput createBytesOutput = IOFactory.createBytesOutput(32);
            for (int i2 = 0; i2 < 100; i2++) {
                SorterTestUtil.writeData(createBytesOutput, 1);
                SorterTestUtil.writeData(createBytesOutput, 1);
            }
            arrayList.add(EntriesUtil.inputFromOutput(createBytesOutput));
        }
        String availablePathById = StoreTestUtil.availablePathById("0");
        Sorter createSorter = SorterTestUtil.createSorter(CONFIG);
        mergeBuffers(createSorter, arrayList, availablePathById);
        Assert.assertEquals(100000L, sumOfEntryValue(createSorter, ImmutableList.of(availablePathById)));
    }

    @Test
    public void testDiffNumEntriesFileMerge() throws Exception {
        Config updateWithRequiredOptions = UnitTestBase.updateWithRequiredOptions(ComputerOptions.HGKV_MERGE_FILES_NUM, "3", ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false");
        ImmutableList of = ImmutableList.of(200, 500, 20, 50, 300, 250, 10, 33, 900, 89, 20);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < of.size(); i++) {
            String availablePathById = StoreTestUtil.availablePathById(i + 10);
            arrayList.add(availablePathById);
            HgkvDirBuilderImpl hgkvDirBuilderImpl = new HgkvDirBuilderImpl(updateWithRequiredOptions, availablePathById);
            for (int i2 = 0; i2 < ((Integer) of.get(i)).intValue(); i2++) {
                try {
                    hgkvDirBuilderImpl.write(new DefaultKvEntry(new InlinePointer(StoreTestUtil.intToByteArray(i2)), new InlinePointer(StoreTestUtil.intToByteArray(1))));
                } catch (Throwable th) {
                    try {
                        hgkvDirBuilderImpl.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            }
            hgkvDirBuilderImpl.close();
        }
        ImmutableList of2 = ImmutableList.of(StoreTestUtil.availablePathById(0), StoreTestUtil.availablePathById(1), StoreTestUtil.availablePathById(2), StoreTestUtil.availablePathById(3));
        SorterTestUtil.createSorter(updateWithRequiredOptions).mergeInputs(arrayList, new KvOuterSortFlusher(), of2, false);
        int sum = of.stream().mapToInt(num -> {
            return num.intValue();
        }).sum();
        int i3 = 0;
        Iterator it = of2.iterator();
        while (it.hasNext()) {
            i3 = (int) (i3 + HgkvDirImpl.open((String) it.next()).numEntries());
        }
        Assert.assertEquals(sum, i3);
    }

    private static RandomAccessInput sortBuffer(Sorter sorter, RandomAccessInput randomAccessInput) throws Exception {
        BytesOutput createBytesOutput = IOFactory.createBytesOutput(32);
        sorter.sortBuffer(randomAccessInput, new CombineKvInnerSortFlusher(createBytesOutput, SorterTestUtil.createPointerCombiner(IntValue::new, new IntValueSumCombiner())), false);
        return EntriesUtil.inputFromOutput(createBytesOutput);
    }

    private static void mergeBuffers(Sorter sorter, List<RandomAccessInput> list, String str) throws Exception {
        sorter.mergeBuffers(list, new CombineKvOuterSortFlusher(SorterTestUtil.createPointerCombiner(IntValue::new, new IntValueSumCombiner())), str, false);
    }

    private static void mergeFiles(Sorter sorter, List<String> list, List<String> list2) throws Exception {
        sorter.mergeInputs(list, new CombineKvOuterSortFlusher(SorterTestUtil.createPointerCombiner(IntValue::new, new IntValueSumCombiner())), list2, false);
    }

    private static long sumOfEntryValue(Sorter sorter, List<String> list) throws Exception {
        long j = 0;
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            j += HgkvDirImpl.open(it.next()).numEntries();
        }
        LOG.info("Finally kvEntry size: {}", Long.valueOf(j));
        PeekableIterator it2 = sorter.iterator(list, false);
        long j2 = 0;
        while (it2.hasNext()) {
            try {
                j2 += StoreTestUtil.dataFromPointer(((KvEntry) it2.next()).value()).intValue();
            } catch (Throwable th) {
                if (it2 != null) {
                    try {
                        it2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        long j3 = j2;
        if (it2 != null) {
            it2.close();
        }
        return j3;
    }

    private static void assertFileOrder(Sorter sorter, List<String> list) throws Exception {
        KvEntry kvEntry = null;
        PeekableIterator it = sorter.iterator(list, false);
        while (it.hasNext()) {
            try {
                KvEntry kvEntry2 = (KvEntry) it.next();
                if (kvEntry == null) {
                    kvEntry = (KvEntry) it.next();
                } else {
                    Assert.assertLte(0, Integer.valueOf(kvEntry.key().compareTo(kvEntry2.key())));
                }
            } catch (Throwable th) {
                if (it != null) {
                    try {
                        it.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (it != null) {
            it.close();
        }
    }
}
