/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.hashtable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.testutils.UnionIterator;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.runtime.hashtable.BinaryHashTableTest;
import org.apache.flink.table.runtime.hashtable.LongHashPartition;
import org.apache.flink.table.runtime.hashtable.LongHybridHashTable;
import org.apache.flink.table.runtime.typeutils.BinaryRowSerializer;
import org.apache.flink.table.runtime.util.UniformBinaryRowGenerator;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class LongHashTableTest {
    private static final int PAGE_SIZE = 32768;
    private IOManager ioManager;
    private BinaryRowSerializer buildSideSerializer;
    private BinaryRowSerializer probeSideSerializer;
    private MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x1C00000L).build();
    private boolean useCompress;
    private Configuration conf;

    public LongHashTableTest(boolean useCompress) {
        this.useCompress = useCompress;
    }

    @Parameterized.Parameters(name="useCompress-{0}")
    public static List<Boolean> getVarSeg() {
        return Arrays.asList(true, false);
    }

    @Before
    public void init() {
        TypeInformation[] types = new TypeInformation[]{Types.INT, Types.INT};
        this.buildSideSerializer = new BinaryRowSerializer(types.length);
        this.probeSideSerializer = new BinaryRowSerializer(types.length);
        this.ioManager = new IOManagerAsync();
        this.conf = new Configuration();
        this.conf.setBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, this.useCompress);
    }

    @Test
    public void testInMemory() throws IOException {
        int numKeys = 100000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(100000, 3, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(100000, 10, true);
        MyHashTable table = new MyHashTable(0xFA0000L);
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput);
        Assert.assertEquals((String)"Wrong number of records in join result.", (long)3000000L, (long)numRecordsInJoinResult);
        table.close();
        table.free();
    }

    @Test
    public void testSpillingHashJoinOneRecursion() throws IOException {
        int numKeys = 100000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(100000, 3, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(100000, 10, true);
        MyHashTable table = new MyHashTable(0x960000L);
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput);
        Assert.assertEquals((String)"Wrong number of records in join result.", (long)3000000L, (long)numRecordsInJoinResult);
        table.close();
        table.free();
    }

    @Test
    public void testSpillingHashJoinOneRecursionPerformance() throws IOException {
        int numKeys = 1000000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(1000000, 3, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(1000000, 10, true);
        MyHashTable table = new MyHashTable(0x320000L);
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput);
        Assert.assertEquals((String)"Wrong number of records in join result.", (long)30000000L, (long)numRecordsInJoinResult);
        table.close();
        table.free();
    }

    @Test
    public void testSpillingHashJoinOneRecursionValidity() throws IOException {
        int numKeys = 1000000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(1000000, 3, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(1000000, 10, true);
        HashMap<Integer, Long> map = new HashMap<Integer, Long>(1000000);
        MyHashTable table = new MyHashTable(0x320000L);
        BinaryRow buildRow = this.buildSideSerializer.createInstance();
        while ((buildRow = (BinaryRow)buildInput.next(buildRow)) != null) {
            table.putBuildRow(buildRow);
        }
        table.endBuild();
        BinaryRow probeRow = this.probeSideSerializer.createInstance();
        while ((probeRow = (BinaryRow)probeInput.next(probeRow)) != null) {
            if (!table.tryProbe((BaseRow)probeRow)) continue;
            this.testJoin(table, map);
        }
        while (table.nextMatching()) {
            this.testJoin(table, map);
        }
        table.close();
        Assert.assertEquals((String)"Wrong number of keys", (long)1000000L, (long)map.size());
        for (Map.Entry<Integer, Long> entry : map.entrySet()) {
            long val = entry.getValue();
            int key = entry.getKey();
            Assert.assertEquals((String)("Wrong number of values in per-key cross product for key " + key), (long)30L, (long)val);
        }
        table.free();
    }

    @Test
    public void testSpillingHashJoinWithMassiveCollisions() throws IOException {
        int repeatedValue1 = 40559;
        int repeatedValue2 = 92882;
        int repeatedValueCountBuild = 200000;
        int repeatedValueCountProbe = 5;
        int numKeys = 1000000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator build1 = new UniformBinaryRowGenerator(1000000, 3, false);
        BinaryHashTableTest.ConstantsKeyValuePairsIterator build2 = new BinaryHashTableTest.ConstantsKeyValuePairsIterator(40559, 17, 200000);
        BinaryHashTableTest.ConstantsKeyValuePairsIterator build3 = new BinaryHashTableTest.ConstantsKeyValuePairsIterator(92882, 23, 200000);
        ArrayList<Object> builds = new ArrayList<Object>();
        builds.add(build1);
        builds.add(build2);
        builds.add(build3);
        UnionIterator buildInput = new UnionIterator(builds);
        UniformBinaryRowGenerator probe1 = new UniformBinaryRowGenerator(1000000, 10, true);
        BinaryHashTableTest.ConstantsKeyValuePairsIterator probe2 = new BinaryHashTableTest.ConstantsKeyValuePairsIterator(40559, 17, 5);
        BinaryHashTableTest.ConstantsKeyValuePairsIterator probe3 = new BinaryHashTableTest.ConstantsKeyValuePairsIterator(92882, 23, 5);
        ArrayList<Object> probes = new ArrayList<Object>();
        probes.add(probe1);
        probes.add(probe2);
        probes.add(probe3);
        UnionIterator probeInput = new UnionIterator(probes);
        HashMap<Integer, Long> map = new HashMap<Integer, Long>(1000000);
        MyHashTable table = new MyHashTable(0x1C00000L);
        BinaryRow buildRow = this.buildSideSerializer.createInstance();
        while ((buildRow = (BinaryRow)buildInput.next((Object)buildRow)) != null) {
            table.putBuildRow(buildRow);
        }
        table.endBuild();
        BinaryRow probeRow = this.probeSideSerializer.createInstance();
        while ((probeRow = (BinaryRow)probeInput.next((Object)probeRow)) != null) {
            if (!table.tryProbe((BaseRow)probeRow)) continue;
            this.testJoin(table, map);
        }
        while (table.nextMatching()) {
            this.testJoin(table, map);
        }
        table.close();
        Assert.assertEquals((String)"Wrong number of keys", (long)1000000L, (long)map.size());
        for (Map.Entry<Integer, Long> entry : map.entrySet()) {
            long val = entry.getValue();
            int key = entry.getKey();
            Assert.assertEquals((String)("Wrong number of values in per-key cross product for key " + key), (long)(key == 40559 || key == 92882 ? 3000045L : 30L), (long)val);
        }
        table.free();
    }

    @Test
    public void testSpillingHashJoinWithTwoRecursions() throws IOException {
        int repeatedValue1 = 40559;
        int repeatedValue2 = 92882;
        int repeatedValueCountBuild = 200000;
        int repeatedValueCountProbe = 5;
        int numKeys = 1000000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator build1 = new UniformBinaryRowGenerator(1000000, 3, false);
        BinaryHashTableTest.ConstantsKeyValuePairsIterator build2 = new BinaryHashTableTest.ConstantsKeyValuePairsIterator(40559, 17, 200000);
        BinaryHashTableTest.ConstantsKeyValuePairsIterator build3 = new BinaryHashTableTest.ConstantsKeyValuePairsIterator(92882, 23, 200000);
        ArrayList<Object> builds = new ArrayList<Object>();
        builds.add(build1);
        builds.add(build2);
        builds.add(build3);
        UnionIterator buildInput = new UnionIterator(builds);
        UniformBinaryRowGenerator probe1 = new UniformBinaryRowGenerator(1000000, 10, true);
        BinaryHashTableTest.ConstantsKeyValuePairsIterator probe2 = new BinaryHashTableTest.ConstantsKeyValuePairsIterator(40559, 17, 5);
        BinaryHashTableTest.ConstantsKeyValuePairsIterator probe3 = new BinaryHashTableTest.ConstantsKeyValuePairsIterator(92882, 23, 5);
        ArrayList<Object> probes = new ArrayList<Object>();
        probes.add(probe1);
        probes.add(probe2);
        probes.add(probe3);
        UnionIterator probeInput = new UnionIterator(probes);
        HashMap<Integer, Long> map = new HashMap<Integer, Long>(1000000);
        MyHashTable table = new MyHashTable(0x1C00000L);
        BinaryRow buildRow = this.buildSideSerializer.createInstance();
        while ((buildRow = (BinaryRow)buildInput.next((Object)buildRow)) != null) {
            table.putBuildRow(buildRow);
        }
        table.endBuild();
        BinaryRow probeRow = this.probeSideSerializer.createInstance();
        while ((probeRow = (BinaryRow)probeInput.next((Object)probeRow)) != null) {
            if (!table.tryProbe((BaseRow)probeRow)) continue;
            this.testJoin(table, map);
        }
        while (table.nextMatching()) {
            this.testJoin(table, map);
        }
        table.close();
        Assert.assertEquals((String)"Wrong number of keys", (long)1000000L, (long)map.size());
        for (Map.Entry<Integer, Long> entry : map.entrySet()) {
            long val = entry.getValue();
            int key = entry.getKey();
            Assert.assertEquals((String)("Wrong number of values in per-key cross product for key " + key), (long)(key == 40559 || key == 92882 ? 3000045L : 30L), (long)val);
        }
        table.free();
    }

    @Test
    public void testFailingHashJoinTooManyRecursions() throws IOException {
        int repeatedValue1 = 40559;
        int repeatedValue2 = 92882;
        int repeatedValueCount = 3000000;
        int numKeys = 1000000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator build1 = new UniformBinaryRowGenerator(1000000, 3, false);
        BinaryHashTableTest.ConstantsKeyValuePairsIterator build2 = new BinaryHashTableTest.ConstantsKeyValuePairsIterator(40559, 17, 3000000);
        BinaryHashTableTest.ConstantsKeyValuePairsIterator build3 = new BinaryHashTableTest.ConstantsKeyValuePairsIterator(92882, 23, 3000000);
        ArrayList<Object> builds = new ArrayList<Object>();
        builds.add(build1);
        builds.add(build2);
        builds.add(build3);
        UnionIterator buildInput = new UnionIterator(builds);
        UniformBinaryRowGenerator probe1 = new UniformBinaryRowGenerator(1000000, 10, true);
        BinaryHashTableTest.ConstantsKeyValuePairsIterator probe2 = new BinaryHashTableTest.ConstantsKeyValuePairsIterator(40559, 17, 3000000);
        BinaryHashTableTest.ConstantsKeyValuePairsIterator probe3 = new BinaryHashTableTest.ConstantsKeyValuePairsIterator(92882, 23, 3000000);
        ArrayList<Object> probes = new ArrayList<Object>();
        probes.add(probe1);
        probes.add(probe2);
        probes.add(probe3);
        UnionIterator probeInput = new UnionIterator(probes);
        MyHashTable table = new MyHashTable(0x1C00000L);
        try {
            this.join(table, (MutableObjectIterator<BinaryRow>)buildInput, (MutableObjectIterator<BinaryRow>)probeInput);
            Assert.fail((String)"Hash Join must have failed due to too many recursions.");
        }
        catch (Exception exception) {
            // empty catch block
        }
        table.close();
        table.free();
    }

    @Test
    public void testSparseProbeSpilling() throws IOException, MemoryAllocationException {
        int numBuildKeys = 1000000;
        boolean numBuildVals = true;
        int numProbeKeys = 20;
        boolean numProbeVals = true;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(1000000, 1, false);
        MyHashTable table = new MyHashTable(0x320000L);
        int expectedNumResults = Math.min(20, 1000000) * 1 * 1;
        int numRecordsInJoinResult = this.join(table, buildInput, new UniformBinaryRowGenerator(20, 1, true));
        Assert.assertEquals((String)"Wrong number of records in join result.", (long)expectedNumResults, (long)numRecordsInJoinResult);
        table.close();
        table.free();
    }

    @Test
    public void validateSpillingDuringInsertion() throws IOException, MemoryAllocationException {
        int numBuildKeys = 500000;
        boolean numBuildVals = true;
        int numProbeKeys = 10;
        boolean numProbeVals = true;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(500000, 1, false);
        MyHashTable table = new MyHashTable(2785280L);
        int expectedNumResults = Math.min(10, 500000) * 1 * 1;
        int numRecordsInJoinResult = this.join(table, buildInput, new UniformBinaryRowGenerator(10, 1, true));
        Assert.assertEquals((String)"Wrong number of records in join result.", (long)expectedNumResults, (long)numRecordsInJoinResult);
        table.close();
        table.free();
    }

    @Test
    public void testBucketsNotFulfillSegment() throws Exception {
        int numKeys = 10000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(10000, 3, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(10000, 10, true);
        MyHashTable table = new MyHashTable(0x118000L);
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput);
        Assert.assertEquals((String)"Wrong number of records in join result.", (long)300000L, (long)numRecordsInJoinResult);
        table.close();
        table.free();
    }

    private void testJoin(MyHashTable table, HashMap<Integer, Long> map) throws IOException {
        BinaryRow record;
        int numBuildValues = 0;
        BaseRow probeRec = table.getCurrentProbeRow();
        int key = probeRec.getInt(0);
        LongHashPartition.MatchIterator buildSide = table.getBuildSideIterator();
        if (buildSide.advanceNext()) {
            numBuildValues = 1;
            record = (BinaryRow)buildSide.getRow();
            Assert.assertEquals((String)"Probe-side key was different than build-side key.", (long)key, (long)record.getInt(0));
        } else {
            Assert.fail((String)"No build side values found for a probe key.");
        }
        while (buildSide.advanceNext()) {
            ++numBuildValues;
            record = (BinaryRow)buildSide.getRow();
            Assert.assertEquals((String)"Probe-side key was different than build-side key.", (long)key, (long)record.getInt(0));
        }
        Long contained = map.get(key);
        contained = contained == null ? Long.valueOf(numBuildValues) : Long.valueOf(contained + (long)numBuildValues);
        map.put(key, contained);
    }

    private int join(MyHashTable table, MutableObjectIterator<BinaryRow> buildInput, MutableObjectIterator<BinaryRow> probeInput) throws IOException {
        BinaryRow buildRow;
        int count = 0;
        BinaryRow reuseBuildSizeRow = this.buildSideSerializer.createInstance();
        while ((buildRow = (BinaryRow)buildInput.next((Object)reuseBuildSizeRow)) != null) {
            table.putBuildRow(buildRow);
        }
        table.endBuild();
        BinaryRow probeRow = this.probeSideSerializer.createInstance();
        while ((probeRow = (BinaryRow)probeInput.next((Object)probeRow)) != null) {
            if (!table.tryProbe((BaseRow)probeRow)) continue;
            count += this.joinWithNextKey(table);
        }
        while (table.nextMatching()) {
            count += this.joinWithNextKey(table);
        }
        return count;
    }

    private int joinWithNextKey(MyHashTable table) throws IOException {
        BinaryRow buildRow;
        int count = 0;
        LongHashPartition.MatchIterator buildIterator = table.getBuildSideIterator();
        BaseRow probeRow = table.getCurrentProbeRow();
        BinaryRow binaryRow = buildRow = buildIterator.advanceNext() ? (BinaryRow)buildIterator.getRow() : null;
        if (probeRow != null && buildRow != null) {
            ++count;
            while (buildIterator.advanceNext()) {
                ++count;
            }
        }
        return count;
    }

    private class MyHashTable
    extends LongHybridHashTable {
        public MyHashTable(long memorySize) {
            super(LongHashTableTest.this.conf, (Object)LongHashTableTest.this, LongHashTableTest.this.buildSideSerializer, LongHashTableTest.this.probeSideSerializer, LongHashTableTest.this.memManager, memorySize, LongHashTableTest.this.ioManager, 24, 200000L);
        }

        public long getBuildLongKey(BaseRow row) {
            return row.getInt(0);
        }

        public long getProbeLongKey(BaseRow row) {
            return row.getInt(0);
        }

        public BinaryRow probeToBinary(BaseRow row) {
            return (BinaryRow)row;
        }
    }
}

