/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.hashtable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.testutils.UnionIterator;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.BinaryRowWriter;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea;
import org.apache.flink.table.runtime.hashtable.BinaryHashTable;
import org.apache.flink.table.runtime.operators.join.HashJoinType;
import org.apache.flink.table.runtime.typeutils.AbstractRowSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowSerializer;
import org.apache.flink.table.runtime.util.RowIterator;
import org.apache.flink.table.runtime.util.UniformBinaryRowGenerator;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class BinaryHashTableTest {
    private static final int PAGE_SIZE = 32768;
    private IOManager ioManager;
    private BinaryRowSerializer buildSideSerializer;
    private BinaryRowSerializer probeSideSerializer;
    private boolean useCompress;
    private Configuration conf;

    public BinaryHashTableTest(boolean useCompress) {
        this.useCompress = useCompress;
    }

    @Parameterized.Parameters(name="useCompress-{0}")
    public static List<Boolean> getVarSeg() {
        return Arrays.asList(true, false);
    }

    @Before
    public void setup() {
        TypeInformation[] types = new TypeInformation[]{Types.INT, Types.INT};
        this.buildSideSerializer = new BinaryRowSerializer(types.length);
        this.probeSideSerializer = new BinaryRowSerializer(types.length);
        this.ioManager = new IOManagerAsync();
        this.conf = new Configuration();
        this.conf.setBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, this.useCompress);
    }

    @After
    public void tearDown() throws Exception {
        this.ioManager.close();
    }

    @Test
    public void testIOBufferCountComputation() {
        Assert.assertEquals((long)1L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)32));
        Assert.assertEquals((long)1L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)33));
        Assert.assertEquals((long)1L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)40));
        Assert.assertEquals((long)1L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)64));
        Assert.assertEquals((long)1L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)127));
        Assert.assertEquals((long)2L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)128));
        Assert.assertEquals((long)2L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)129));
        Assert.assertEquals((long)2L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)511));
        Assert.assertEquals((long)3L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)512));
        Assert.assertEquals((long)3L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)513));
        Assert.assertEquals((long)3L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)2047));
        Assert.assertEquals((long)4L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)2048));
        Assert.assertEquals((long)4L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)2049));
        Assert.assertEquals((long)4L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)8191));
        Assert.assertEquals((long)5L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)8192));
        Assert.assertEquals((long)5L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)8193));
        Assert.assertEquals((long)5L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)Short.MAX_VALUE));
        Assert.assertEquals((long)6L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)32768));
        Assert.assertEquals((long)6L, (long)BinaryHashTable.getNumWriteBehindBuffers((int)Integer.MAX_VALUE));
    }

    @Test
    public void testInMemoryMutableHashTable() throws IOException {
        int numKeys = 100000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(100000, 3, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(100000, 10, true);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x1C00000L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x320000L, this.ioManager);
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput);
        Assert.assertEquals((String)"Wrong number of records in join result.", (long)3000000L, (long)numRecordsInJoinResult);
        table.close();
        table.free();
    }

    private int join(BinaryHashTable table, MutableObjectIterator<BinaryRow> buildInput, MutableObjectIterator<BinaryRow> probeInput) throws IOException {
        return this.join(table, buildInput, probeInput, false);
    }

    private int join(BinaryHashTable table, MutableObjectIterator<BinaryRow> buildInput, MutableObjectIterator<BinaryRow> probeInput, boolean buildOuterJoin) throws IOException {
        BinaryRow buildRow;
        int count = 0;
        BinaryRow reuseBuildSizeRow = this.buildSideSerializer.createInstance();
        while ((buildRow = (BinaryRow)buildInput.next((Object)reuseBuildSizeRow)) != null) {
            table.putBuildRow((BaseRow)buildRow);
        }
        table.endBuild();
        BinaryRow probeRow = this.probeSideSerializer.createInstance();
        while ((probeRow = (BinaryRow)probeInput.next((Object)probeRow)) != null) {
            if (!table.tryProbe((BaseRow)probeRow)) continue;
            count += this.joinWithNextKey(table, buildOuterJoin);
        }
        while (table.nextMatching()) {
            count += this.joinWithNextKey(table, buildOuterJoin);
        }
        return count;
    }

    private int joinWithNextKey(BinaryHashTable table, boolean buildOuterJoin) throws IOException {
        int count;
        block3: {
            BinaryRow buildRow;
            BaseRow probeRow;
            RowIterator buildIterator;
            block2: {
                count = 0;
                buildIterator = table.getBuildSideIterator();
                probeRow = table.getCurrentProbeRow();
                BinaryRow binaryRow = buildRow = buildIterator.advanceNext() ? (BinaryRow)buildIterator.getRow() : null;
                if (probeRow == null || buildRow == null) break block2;
                ++count;
                while (buildIterator.advanceNext()) {
                    ++count;
                }
                break block3;
            }
            if (!buildOuterJoin || probeRow != null || buildRow == null) break block3;
            ++count;
            while (buildIterator.advanceNext()) {
                ++count;
            }
        }
        return count;
    }

    @Test
    public void testSpillingHashJoinOneRecursionPerformance() throws IOException {
        int numKeys = 1000000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(1000000, 3, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(1000000, 10, true);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x640000L).setPageSize(32768).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x320000L, this.ioManager);
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput);
        Assert.assertEquals((String)"Wrong number of records in join result.", (long)30000000L, (long)numRecordsInJoinResult);
        table.close();
        table.free();
    }

    @Test
    public void testSpillingHashJoinOneRecursionValidity() throws IOException {
        int numKeys = 1000000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(1000000, 3, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(1000000, 10, true);
        HashMap<Integer, Long> map = new HashMap<Integer, Long>(1000000);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x1C00000L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x320000L, this.ioManager);
        BinaryRow recordReuse = new BinaryRow(2);
        BinaryRow buildRow = this.buildSideSerializer.createInstance();
        while ((buildRow = (BinaryRow)buildInput.next(buildRow)) != null) {
            table.putBuildRow((BaseRow)buildRow);
        }
        table.endBuild();
        BinaryRow probeRow = this.probeSideSerializer.createInstance();
        while ((probeRow = (BinaryRow)probeInput.next(probeRow)) != null) {
            if (!table.tryProbe((BaseRow)probeRow)) continue;
            this.testJoin(table, map);
        }
        while (table.nextMatching()) {
            this.testJoin(table, map);
        }
        table.close();
        Assert.assertEquals((String)"Wrong number of keys", (long)1000000L, (long)map.size());
        for (Map.Entry<Integer, Long> entry : map.entrySet()) {
            long val = entry.getValue();
            int key = entry.getKey();
            Assert.assertEquals((String)("Wrong number of values in per-key cross product for key " + key), (long)30L, (long)val);
        }
        table.free();
    }

    @Test
    public void testSpillingHashJoinWithMassiveCollisions() throws IOException {
        int repeatedValue1 = 40559;
        int repeatedValue2 = 92882;
        int repeatedValueCountBuild = 200000;
        int repeatedValueCountProbe = 5;
        int numKeys = 1000000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator build1 = new UniformBinaryRowGenerator(1000000, 3, false);
        ConstantsKeyValuePairsIterator build2 = new ConstantsKeyValuePairsIterator(40559, 17, 200000);
        ConstantsKeyValuePairsIterator build3 = new ConstantsKeyValuePairsIterator(92882, 23, 200000);
        ArrayList<Object> builds = new ArrayList<Object>();
        builds.add(build1);
        builds.add(build2);
        builds.add(build3);
        UnionIterator buildInput = new UnionIterator(builds);
        UniformBinaryRowGenerator probe1 = new UniformBinaryRowGenerator(1000000, 10, true);
        ConstantsKeyValuePairsIterator probe2 = new ConstantsKeyValuePairsIterator(40559, 17, 5);
        ConstantsKeyValuePairsIterator probe3 = new ConstantsKeyValuePairsIterator(92882, 23, 5);
        ArrayList<Object> probes = new ArrayList<Object>();
        probes.add(probe1);
        probes.add(probe2);
        probes.add(probe3);
        UnionIterator probeInput = new UnionIterator(probes);
        HashMap<Integer, Long> map = new HashMap<Integer, Long>(1000000);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x1C00000L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x1C00000L, this.ioManager);
        BinaryRow recordReuse = new BinaryRow(2);
        BinaryRow buildRow = this.buildSideSerializer.createInstance();
        while ((buildRow = (BinaryRow)buildInput.next((Object)buildRow)) != null) {
            table.putBuildRow((BaseRow)buildRow);
        }
        table.endBuild();
        BinaryRow probeRow = this.probeSideSerializer.createInstance();
        while ((probeRow = (BinaryRow)probeInput.next((Object)probeRow)) != null) {
            if (!table.tryProbe((BaseRow)probeRow)) continue;
            this.testJoin(table, map);
        }
        while (table.nextMatching()) {
            this.testJoin(table, map);
        }
        table.close();
        Assert.assertEquals((String)"Wrong number of keys", (long)1000000L, (long)map.size());
        for (Map.Entry<Integer, Long> entry : map.entrySet()) {
            long val = entry.getValue();
            int key = entry.getKey();
            Assert.assertEquals((String)("Wrong number of values in per-key cross product for key " + key), (long)(key == 40559 || key == 92882 ? 3000045L : 30L), (long)val);
        }
        table.free();
    }

    private void testJoin(BinaryHashTable table, HashMap<Integer, Long> map) throws IOException {
        BinaryRow record;
        int numBuildValues = 0;
        BaseRow probeRec = table.getCurrentProbeRow();
        int key = probeRec.getInt(0);
        RowIterator buildSide = table.getBuildSideIterator();
        if (buildSide.advanceNext()) {
            numBuildValues = 1;
            record = (BinaryRow)buildSide.getRow();
            Assert.assertEquals((String)"Probe-side key was different than build-side key.", (long)key, (long)record.getInt(0));
        } else {
            Assert.fail((String)"No build side values found for a probe key.");
        }
        while (buildSide.advanceNext()) {
            ++numBuildValues;
            record = (BinaryRow)buildSide.getRow();
            Assert.assertEquals((String)"Probe-side key was different than build-side key.", (long)key, (long)record.getInt(0));
        }
        Long contained = map.get(key);
        contained = contained == null ? Long.valueOf(numBuildValues) : Long.valueOf(contained + (long)numBuildValues);
        map.put(key, contained);
    }

    @Test
    public void testSpillingHashJoinWithTwoRecursions() throws IOException {
        int repeatedValue1 = 40559;
        int repeatedValue2 = 92882;
        int repeatedValueCountBuild = 200000;
        int repeatedValueCountProbe = 5;
        int numKeys = 1000000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator build1 = new UniformBinaryRowGenerator(1000000, 3, false);
        ConstantsKeyValuePairsIterator build2 = new ConstantsKeyValuePairsIterator(40559, 17, 200000);
        ConstantsKeyValuePairsIterator build3 = new ConstantsKeyValuePairsIterator(92882, 23, 200000);
        ArrayList<Object> builds = new ArrayList<Object>();
        builds.add(build1);
        builds.add(build2);
        builds.add(build3);
        UnionIterator buildInput = new UnionIterator(builds);
        UniformBinaryRowGenerator probe1 = new UniformBinaryRowGenerator(1000000, 10, true);
        ConstantsKeyValuePairsIterator probe2 = new ConstantsKeyValuePairsIterator(40559, 17, 5);
        ConstantsKeyValuePairsIterator probe3 = new ConstantsKeyValuePairsIterator(92882, 23, 5);
        ArrayList<Object> probes = new ArrayList<Object>();
        probes.add(probe1);
        probes.add(probe2);
        probes.add(probe3);
        UnionIterator probeInput = new UnionIterator(probes);
        HashMap<Integer, Long> map = new HashMap<Integer, Long>(1000000);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x1C00000L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x1C00000L, this.ioManager);
        BinaryRow recordReuse = new BinaryRow(2);
        BinaryRow buildRow = this.buildSideSerializer.createInstance();
        while ((buildRow = (BinaryRow)buildInput.next((Object)buildRow)) != null) {
            table.putBuildRow((BaseRow)buildRow);
        }
        table.endBuild();
        BinaryRow probeRow = this.probeSideSerializer.createInstance();
        while ((probeRow = (BinaryRow)probeInput.next((Object)probeRow)) != null) {
            if (!table.tryProbe((BaseRow)probeRow)) continue;
            this.testJoin(table, map);
        }
        while (table.nextMatching()) {
            this.testJoin(table, map);
        }
        table.close();
        Assert.assertEquals((String)"Wrong number of keys", (long)1000000L, (long)map.size());
        for (Map.Entry<Integer, Long> entry : map.entrySet()) {
            long val = entry.getValue();
            int key = entry.getKey();
            Assert.assertEquals((String)("Wrong number of values in per-key cross product for key " + key), (long)(key == 40559 || key == 92882 ? 3000045L : 30L), (long)val);
        }
        table.free();
    }

    @Test
    public void testFailingHashJoinTooManyRecursions() throws IOException {
        int repeatedValue1 = 40559;
        int repeatedValue2 = 92882;
        int repeatedValueCount = 3000000;
        int numKeys = 1000000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator build1 = new UniformBinaryRowGenerator(1000000, 3, false);
        ConstantsKeyValuePairsIterator build2 = new ConstantsKeyValuePairsIterator(40559, 17, 3000000);
        ConstantsKeyValuePairsIterator build3 = new ConstantsKeyValuePairsIterator(92882, 23, 3000000);
        ArrayList<Object> builds = new ArrayList<Object>();
        builds.add(build1);
        builds.add(build2);
        builds.add(build3);
        UnionIterator buildInput = new UnionIterator(builds);
        UniformBinaryRowGenerator probe1 = new UniformBinaryRowGenerator(1000000, 10, true);
        ConstantsKeyValuePairsIterator probe2 = new ConstantsKeyValuePairsIterator(40559, 17, 3000000);
        ConstantsKeyValuePairsIterator probe3 = new ConstantsKeyValuePairsIterator(92882, 23, 3000000);
        ArrayList<Object> probes = new ArrayList<Object>();
        probes.add(probe1);
        probes.add(probe2);
        probes.add(probe3);
        UnionIterator probeInput = new UnionIterator(probes);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x1C00000L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x1C00000L, this.ioManager);
        try {
            this.join(table, (MutableObjectIterator<BinaryRow>)buildInput, (MutableObjectIterator<BinaryRow>)probeInput);
            Assert.fail((String)"Hash Join must have failed due to too many recursions.");
        }
        catch (Exception exception) {
            // empty catch block
        }
        table.close();
        table.free();
    }

    @Test
    public void testSparseProbeSpilling() throws IOException, MemoryAllocationException {
        int numBuildKeys = 1000000;
        boolean numBuildVals = true;
        int numProbeKeys = 20;
        boolean numProbeVals = true;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(1000000, 1, false);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x400000L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x320000L, this.ioManager);
        int expectedNumResults = Math.min(20, 1000000) * 1 * 1;
        int numRecordsInJoinResult = this.join(table, buildInput, new UniformBinaryRowGenerator(20, 1, true));
        Assert.assertEquals((String)"Wrong number of records in join result.", (long)expectedNumResults, (long)numRecordsInJoinResult);
        table.close();
        table.free();
    }

    @Test
    public void testSparseProbeSpillingWithOuterJoin() throws IOException {
        int numBuildKeys = 1000000;
        boolean numBuildVals = true;
        int numProbeKeys = 20;
        boolean numProbeVals = true;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(1000000, 1, false);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x300000L).build();
        BinaryHashTable table = new BinaryHashTable(this.conf, new Object(), (AbstractRowSerializer)this.buildSideSerializer, (AbstractRowSerializer)this.probeSideSerializer, (Projection)new MyProjection(), (Projection)new MyProjection(), memManager, 0x300000L, this.ioManager, 24, 200000L, true, HashJoinType.BUILD_OUTER, null, true, new boolean[]{true}, false);
        int expectedNumResults = Math.max(20, 1000000) * 1 * 1;
        int numRecordsInJoinResult = this.join(table, buildInput, new UniformBinaryRowGenerator(20, 1, true), true);
        Assert.assertEquals((String)"Wrong number of records in join result.", (long)expectedNumResults, (long)numRecordsInJoinResult);
        table.close();
        table.free();
    }

    @Test
    public void validateSpillingDuringInsertion() throws IOException, MemoryAllocationException {
        int numBuildKeys = 500000;
        boolean numBuildVals = true;
        int numProbeKeys = 10;
        boolean numProbeVals = true;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(500000, 1, false);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(2785280L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 2785280L, this.ioManager);
        int expectedNumResults = Math.min(10, 500000) * 1 * 1;
        int numRecordsInJoinResult = this.join(table, buildInput, new UniformBinaryRowGenerator(10, 1, true));
        Assert.assertEquals((String)"Wrong number of records in join result.", (long)expectedNumResults, (long)numRecordsInJoinResult);
        table.close();
        table.free();
    }

    @Test
    public void testBucketsNotFulfillSegment() throws Exception {
        int numKeys = 10000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(10000, 3, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(10000, 10, true);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x118000L).build();
        BinaryHashTable table = new BinaryHashTable(this.conf, new Object(), (AbstractRowSerializer)this.buildSideSerializer, (AbstractRowSerializer)this.probeSideSerializer, (Projection)new MyProjection(), (Projection)new MyProjection(), memManager, 0x118000L, this.ioManager, 24, 200000L, true, HashJoinType.INNER, null, false, new boolean[]{true}, false);
        for (MemorySegment segment : table.getFreedMemory()) {
            int newBucketOffset = segment.size() - 128;
            segment.put(newBucketOffset, (byte)0);
            segment.put(newBucketOffset + 1, (byte)0);
            segment.putShort(newBucketOffset + 2, (short)-1);
            segment.putLong(newBucketOffset + 4, -1L);
        }
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput);
        Assert.assertEquals((String)"Wrong number of records in join result.", (long)300000L, (long)numRecordsInJoinResult);
        table.close();
        table.free();
    }

    @Test
    public void testHashWithBuildSideOuterJoin1() throws Exception {
        int numKeys = 20000;
        boolean buildValsPerKey = true;
        boolean probeValsPerKey = true;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(40000, 1, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(20000, 1, true);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x118000L).build();
        BinaryHashTable table = new BinaryHashTable(this.conf, new Object(), (AbstractRowSerializer)this.buildSideSerializer, (AbstractRowSerializer)this.probeSideSerializer, (Projection)new MyProjection(), (Projection)new MyProjection(), memManager, 0x118000L, this.ioManager, 24, 200000L, true, HashJoinType.BUILD_OUTER, null, true, new boolean[]{true}, false);
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput, true);
        Assert.assertEquals((String)"Wrong number of records in join result.", (long)40000L, (long)numRecordsInJoinResult);
        table.close();
        table.free();
    }

    @Test
    public void testHashWithBuildSideOuterJoin2() throws Exception {
        int numKeys = 40000;
        int buildValsPerKey = 2;
        boolean probeValsPerKey = true;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(40000, 2, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(40000, 1, true);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x118000L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x108000L, this.ioManager);
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput, true);
        Assert.assertEquals((String)"Wrong number of records in join result.", (long)80000L, (long)numRecordsInJoinResult);
        table.close();
        table.free();
    }

    @Test
    public void testRepeatBuildJoin() throws Exception {
        int numKeys = 500;
        boolean probeValsPerKey = true;
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x140000L).build();
        MutableObjectIterator<BinaryRow> buildInput = new MutableObjectIterator<BinaryRow>(){
            int cnt = 0;

            public BinaryRow next(BinaryRow reuse) throws IOException {
                return this.next();
            }

            public BinaryRow next() throws IOException {
                ++this.cnt;
                if (this.cnt > 500) {
                    return null;
                }
                BinaryRow row = new BinaryRow(2);
                BinaryRowWriter writer = new BinaryRowWriter(row);
                writer.writeInt(0, 1);
                writer.writeInt(1, 1);
                writer.complete();
                return row;
            }
        };
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(500, 1, true);
        BinaryHashTable table = new BinaryHashTable(this.conf, new Object(), (AbstractRowSerializer)this.buildSideSerializer, (AbstractRowSerializer)this.probeSideSerializer, (Projection)new MyProjection(), (Projection)new MyProjection(), memManager, 0x140000L, this.ioManager, 24, 200000L, true, HashJoinType.INNER, null, false, new boolean[]{true}, true);
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput, true);
        Assert.assertEquals((String)"Wrong number of records in join result.", (long)1L, (long)numRecordsInJoinResult);
        table.close();
        table.free();
    }

    @Test
    public void testRepeatBuildJoinWithSpill() throws Exception {
        int numKeys = 30000;
        int numRows = 300000;
        boolean probeValsPerKey = true;
        MutableObjectIterator<BinaryRow> buildInput = new MutableObjectIterator<BinaryRow>(){
            int cnt = 0;

            public BinaryRow next(BinaryRow reuse) throws IOException {
                return this.next();
            }

            public BinaryRow next() throws IOException {
                ++this.cnt;
                if (this.cnt > 300000) {
                    return null;
                }
                int value = this.cnt % 30000;
                BinaryRow row = new BinaryRow(2);
                BinaryRowWriter writer = new BinaryRowWriter(row);
                writer.writeInt(0, value);
                writer.writeInt(1, value);
                writer.complete();
                return row;
            }
        };
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x118000L).build();
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(30000, 1, true);
        BinaryHashTable table = new BinaryHashTable(this.conf, new Object(), (AbstractRowSerializer)this.buildSideSerializer, (AbstractRowSerializer)this.probeSideSerializer, (Projection)new MyProjection(), (Projection)new MyProjection(), memManager, 0x118000L, this.ioManager, 24, 200000L, true, HashJoinType.INNER, null, false, new boolean[]{true}, true);
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput, true);
        Assert.assertTrue((String)"Wrong number of records in join result.", (numRecordsInJoinResult < 300000 ? 1 : 0) != 0);
        table.close();
        table.free();
    }

    @Test
    public void testBinaryHashBucketAreaNotEnoughMem() throws IOException {
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x118000L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x118000L, this.ioManager);
        BinaryHashBucketArea area = new BinaryHashBucketArea(table, 100.0, 1, false);
        for (int i = 0; i < 100000; ++i) {
            area.insertToBucket(i, i, true);
        }
        area.freeMemory();
        table.close();
        Assert.assertEquals((long)35L, (long)table.getFreedMemory().size());
    }

    private BinaryHashTable newBinaryHashTable(BinaryRowSerializer buildSideSerializer, BinaryRowSerializer probeSideSerializer, Projection<BaseRow, BinaryRow> buildSideProjection, Projection<BaseRow, BinaryRow> probeSideProjection, MemoryManager memoryManager, long memory, IOManager ioManager) {
        return new BinaryHashTable(this.conf, new Object(), (AbstractRowSerializer)buildSideSerializer, (AbstractRowSerializer)probeSideSerializer, buildSideProjection, probeSideProjection, memoryManager, memory, ioManager, 24, 200000L, true, HashJoinType.INNER, null, false, new boolean[]{true}, false);
    }

    private static final class MyProjection
    implements Projection<BaseRow, BinaryRow> {
        BinaryRow innerRow = new BinaryRow(1);
        BinaryRowWriter writer = new BinaryRowWriter(this.innerRow);

        private MyProjection() {
        }

        public BinaryRow apply(BaseRow row) {
            this.writer.reset();
            if (row.isNullAt(0)) {
                this.writer.setNullAt(0);
            } else {
                this.writer.writeInt(0, row.getInt(0));
            }
            this.writer.complete();
            return this.innerRow;
        }
    }

    public static final class ConstantsKeyValuePairsIterator
    implements MutableObjectIterator<BinaryRow> {
        private final IntValue key;
        private final IntValue value;
        private int numLeft;

        public ConstantsKeyValuePairsIterator(int key, int value, int count) {
            this.key = new IntValue(key);
            this.value = new IntValue(value);
            this.numLeft = count;
        }

        public BinaryRow next(BinaryRow reuse) {
            if (this.numLeft > 0) {
                --this.numLeft;
                BinaryRowWriter writer = new BinaryRowWriter(reuse);
                writer.writeInt(0, this.key.getValue());
                writer.writeInt(1, this.value.getValue());
                writer.complete();
                return reuse;
            }
            return null;
        }

        public BinaryRow next() {
            return this.next(new BinaryRow(2));
        }
    }
}

