package org.apache.hyracks.dataflow.std.join;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.BitSet;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.dataflow.common.io.RunFileReader;
import org.apache.hyracks.dataflow.common.io.RunFileWriter;
import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
import org.apache.hyracks.dataflow.std.buffermanager.FramePoolBackedFrameBufferManager;
import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedTupleBufferManager;
import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
import org.apache.hyracks.dataflow.std.buffermanager.PreferToSpillFullyOccupiedFramePolicy;
import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
import org.apache.hyracks.dataflow.std.structures.TuplePointer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.class */
public class OptimizedHybridHashJoin {
    private static final Logger LOGGER = LogManager.getLogger();
    private FrameTupleAppender bigFrameAppender;
    private final IHyracksJobletContext jobletCtx;
    private final String buildRelName;
    private final String probeRelName;
    private final ITuplePartitionComputer buildHpc;
    private final ITuplePartitionComputer probeHpc;
    private final RecordDescriptor buildRd;
    private final RecordDescriptor probeRd;
    private final RunFileWriter[] buildRFWriters;
    private final RunFileWriter[] probeRFWriters;
    private final IPredicateEvaluator buildPredEval;
    private final IPredicateEvaluator probePredEval;
    private final boolean isLeftOuter;
    private final IMissingWriter[] nonMatchWriters;
    private final BitSet spilledStatus;
    private final int numOfPartitions;
    private final int memSizeInFrames;
    private InMemoryHashJoin inMemJoiner;
    private IPartitionedTupleBufferManager bufferManager;
    private PreferToSpillFullyOccupiedFramePolicy spillPolicy;
    private final FrameTupleAccessor accessorBuild;
    private final FrameTupleAccessor accessorProbe;
    private ISimpleFrameBufferManager bufferManagerForHashTable;
    private boolean isReversed;
    private int[] buildPSizeInTups;
    private IFrame reloadBuffer;
    private final TuplePointer tempPtr = new TuplePointer();
    private int[] probePSizeInTups;

    public OptimizedHybridHashJoin(IHyracksJobletContext iHyracksJobletContext, int i, int i2, String str, String str2, RecordDescriptor recordDescriptor, RecordDescriptor recordDescriptor2, ITuplePartitionComputer iTuplePartitionComputer, ITuplePartitionComputer iTuplePartitionComputer2, IPredicateEvaluator iPredicateEvaluator, IPredicateEvaluator iPredicateEvaluator2, boolean z, IMissingWriterFactory[] iMissingWriterFactoryArr) {
        this.jobletCtx = iHyracksJobletContext;
        this.memSizeInFrames = i;
        this.buildRd = recordDescriptor2;
        this.probeRd = recordDescriptor;
        this.buildHpc = iTuplePartitionComputer2;
        this.probeHpc = iTuplePartitionComputer;
        this.buildRelName = str2;
        this.probeRelName = str;
        this.numOfPartitions = i2;
        this.buildRFWriters = new RunFileWriter[i2];
        this.probeRFWriters = new RunFileWriter[i2];
        this.accessorBuild = new FrameTupleAccessor(recordDescriptor2);
        this.accessorProbe = new FrameTupleAccessor(recordDescriptor);
        this.isLeftOuter = z;
        if (z && iPredicateEvaluator != null) {
            throw new IllegalStateException();
        }
        this.buildPredEval = iPredicateEvaluator2;
        this.probePredEval = iPredicateEvaluator;
        this.isReversed = false;
        this.spilledStatus = new BitSet(i2);
        this.nonMatchWriters = z ? new IMissingWriter[iMissingWriterFactoryArr.length] : null;
        if (z) {
            for (int i3 = 0; i3 < iMissingWriterFactoryArr.length; i3++) {
                this.nonMatchWriters[i3] = iMissingWriterFactoryArr[i3].createMissingWriter();
            }
        }
    }

    public void initBuild() throws HyracksDataException {
        DeallocatableFramePool deallocatableFramePool = new DeallocatableFramePool(this.jobletCtx, this.memSizeInFrames * this.jobletCtx.getInitialFrameSize());
        this.bufferManagerForHashTable = new FramePoolBackedFrameBufferManager(deallocatableFramePool);
        this.bufferManager = new VPartitionTupleBufferManager(PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(this.spilledStatus), this.numOfPartitions, deallocatableFramePool);
        this.spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(this.bufferManager, this.spilledStatus);
        this.spilledStatus.clear();
        this.buildPSizeInTups = new int[this.numOfPartitions];
    }

    public void build(ByteBuffer byteBuffer) throws HyracksDataException {
        this.accessorBuild.reset(byteBuffer);
        int tupleCount = this.accessorBuild.getTupleCount();
        for (int i = 0; i < tupleCount; i++) {
            if (this.buildPredEval == null || this.buildPredEval.evaluate(this.accessorBuild, i)) {
                int partition = this.buildHpc.partition(this.accessorBuild, i, this.numOfPartitions);
                processTupleBuildPhase(i, partition);
                int[] iArr = this.buildPSizeInTups;
                iArr[partition] = iArr[partition] + 1;
            }
        }
    }

    private void processTupleBuildPhase(int i, int i2) throws HyracksDataException {
        int selectVictimPartition;
        while (!this.bufferManager.insertTuple(i2, this.accessorBuild, i, this.tempPtr)) {
            int framesNeeded = this.bufferManager.framesNeeded(this.accessorBuild.getTupleLength(i), 0);
            int frameLimit = this.bufferManager.getConstrain().frameLimit(i2);
            if (framesNeeded > frameLimit || (selectVictimPartition = this.spillPolicy.selectVictimPartition(i2)) < 0) {
                if (framesNeeded > this.memSizeInFrames) {
                    logTupleInsertionFailure(i, i2, framesNeeded, frameLimit);
                    throw HyracksDataException.create(ErrorCode.INSUFFICIENT_MEMORY, new Serializable[0]);
                }
                if (framesNeeded <= 1) {
                    logTupleInsertionFailure(i, i2, framesNeeded, frameLimit);
                    throw new IllegalStateException("can't insert tuple in join memory");
                }
                flushBigObjectToDisk(i2, this.accessorBuild, i, this.buildRFWriters, this.buildRelName);
                this.spilledStatus.set(i2);
                if (this.bufferManager.getPhysicalSize(i2) / this.jobletCtx.getInitialFrameSize() > this.bufferManager.getConstrain().frameLimit(i2)) {
                    spillPartition(i2);
                    return;
                }
                return;
            }
            spillPartition(selectVictimPartition);
        }
    }

    private void spillPartition(int i) throws HyracksDataException {
        this.bufferManager.flushPartition(i, getSpillWriterOrCreateNewOneIfNotExist(this.buildRFWriters, this.buildRelName, i));
        this.bufferManager.clearPartition(i);
        this.spilledStatus.set(i);
    }

    private void closeBuildPartition(int i) throws HyracksDataException {
        if (this.buildRFWriters[i] == null) {
            throw new HyracksDataException("Tried to close the non-existing file writer.");
        }
        this.buildRFWriters[i].close();
    }

    private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(RunFileWriter[] runFileWriterArr, String str, int i) throws HyracksDataException {
        RunFileWriter runFileWriter = runFileWriterArr[i];
        if (runFileWriter == null) {
            runFileWriter = new RunFileWriter(this.jobletCtx.createManagedWorkspaceFile(str), this.jobletCtx.getIoManager());
            runFileWriter.open();
            runFileWriterArr[i] = runFileWriter;
        }
        return runFileWriter;
    }

    public void closeBuild() throws HyracksDataException {
        closeAllSpilledPartitions(this.buildRFWriters, this.buildRelName);
        this.inMemJoiner = new InMemoryHashJoin(this.jobletCtx, new FrameTupleAccessor(this.probeRd), this.probeHpc, new FrameTupleAccessor(this.buildRd), this.buildRd, this.buildHpc, this.isLeftOuter, this.nonMatchWriters, new SerializableHashTable(makeSpaceForHashTableAndBringBackSpilledPartitions(), this.jobletCtx, this.bufferManagerForHashTable), this.isReversed, this.bufferManagerForHashTable);
        buildHashTable();
    }

    public void clearBuildTempFiles() throws HyracksDataException {
        clearTempFiles(this.buildRFWriters);
    }

    public void clearProbeTempFiles() throws HyracksDataException {
        clearTempFiles(this.probeRFWriters);
    }

    private void clearTempFiles(RunFileWriter[] runFileWriterArr) throws HyracksDataException {
        for (int i = 0; i < runFileWriterArr.length; i++) {
            if (runFileWriterArr[i] != null) {
                runFileWriterArr[i].erase();
            }
        }
    }

    public void fail() throws HyracksDataException {
        for (IFrameWriter iFrameWriter : this.buildRFWriters) {
            if (iFrameWriter != null) {
                CleanupUtils.fail(iFrameWriter, (Throwable) null);
            }
        }
        for (IFrameWriter iFrameWriter2 : this.probeRFWriters) {
            if (iFrameWriter2 != null) {
                CleanupUtils.fail(iFrameWriter2, (Throwable) null);
            }
        }
    }

    private void closeAllSpilledPartitions(RunFileWriter[] runFileWriterArr, String str) throws HyracksDataException {
        try {
            int nextSetBit = this.spilledStatus.nextSetBit(0);
            while (nextSetBit >= 0) {
                if (nextSetBit >= this.numOfPartitions) {
                    break;
                }
                if (this.bufferManager.getNumTuples(nextSetBit) > 0) {
                    this.bufferManager.flushPartition(nextSetBit, getSpillWriterOrCreateNewOneIfNotExist(runFileWriterArr, str, nextSetBit));
                    this.bufferManager.clearPartition(nextSetBit);
                }
                nextSetBit = this.spilledStatus.nextSetBit(nextSetBit + 1);
            }
        } finally {
            for (RunFileWriter runFileWriter : runFileWriterArr) {
                if (runFileWriter != null) {
                    runFileWriter.close();
                }
            }
        }
    }

    private int makeSpaceForHashTableAndBringBackSpilledPartitions() throws HyracksDataException {
        int initialFrameSize = this.jobletCtx.getInitialFrameSize();
        long cardinality = (this.memSizeInFrames - this.spilledStatus.cardinality()) * initialFrameSize;
        int i = 0;
        int nextClearBit = this.spilledStatus.nextClearBit(0);
        while (true) {
            int i2 = nextClearBit;
            if (i2 < 0 || i2 >= this.numOfPartitions) {
                break;
            }
            cardinality -= this.bufferManager.getPhysicalSize(i2);
            i += this.buildPSizeInTups[i2];
            nextClearBit = this.spilledStatus.nextClearBit(i2 + 1);
        }
        return spillAndReloadPartitions(initialFrameSize, cardinality - SerializableHashTable.getExpectedTableByteSize(i, initialFrameSize), i);
    }

    private int spillAndReloadPartitions(int i, long j, int i2) throws HyracksDataException {
        long j2 = j;
        int i3 = i2;
        while (j2 < 0) {
            int selectSinglePartitionToSpill = selectSinglePartitionToSpill(j2, i3, i);
            if (selectSinglePartitionToSpill < 0) {
                throw new HyracksDataException("Hash join does not have enough memory even after spilling.");
            }
            int i4 = this.buildPSizeInTups[selectSinglePartitionToSpill];
            long j3 = -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(i3, -i4, i);
            i3 -= i4;
            j2 += (this.bufferManager.getPhysicalSize(selectSinglePartitionToSpill) + j3) - i;
            spillPartition(selectSinglePartitionToSpill);
            closeBuildPartition(selectSinglePartitionToSpill);
        }
        return bringPartitionsBack(j2, i3, i);
    }

    private int bringPartitionsBack(long j, int i, int i2) throws HyracksDataException {
        int i3 = 0;
        int i4 = i;
        long j2 = j;
        while (true) {
            long j3 = j2;
            int selectAPartitionToReload = selectAPartitionToReload(j3, i3, i4);
            i3 = selectAPartitionToReload;
            if (selectAPartitionToReload < 0 || !loadSpilledPartitionToMem(i3, this.buildRFWriters[i3])) {
                break;
            }
            i4 += this.buildPSizeInTups[i3];
            j2 = ((j3 - this.bufferManager.getPhysicalSize(i3)) - SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(i, this.buildPSizeInTups[i3], i2)) + i2;
        }
        return i4;
    }

    private int selectSinglePartitionToSpill(long j, int i, int i2) {
        long j2 = this.memSizeInFrames * i2;
        int i3 = -1;
        int i4 = -1;
        int nextClearBit = this.spilledStatus.nextClearBit(0);
        while (true) {
            int i5 = nextClearBit;
            if (i5 < 0 || i5 >= this.numOfPartitions) {
                break;
            }
            if (this.buildPSizeInTups[i5] != 0 && this.bufferManager.getPhysicalSize(i5) != 0) {
                if (i4 < 0) {
                    i4 = i5;
                }
                long physicalSize = ((j + this.bufferManager.getPhysicalSize(i5)) - i2) + (-SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(i, -this.buildPSizeInTups[i5], i2));
                if (physicalSize == 0) {
                    return i5;
                }
                if (physicalSize > 0 && physicalSize < j2) {
                    j2 = physicalSize;
                    i3 = i5;
                }
            }
            nextClearBit = this.spilledStatus.nextClearBit(i5 + 1);
        }
        return i3 >= 0 ? i3 : i4;
    }

    private int selectAPartitionToReload(long j, int i, int i2) {
        int initialFrameSize = this.jobletCtx.getInitialFrameSize();
        long j2 = j + initialFrameSize;
        if (j2 <= 0) {
            return -1;
        }
        int nextSetBit = this.spilledStatus.nextSetBit(i);
        while (true) {
            int i3 = nextSetBit;
            if (i3 < 0 || i3 >= this.numOfPartitions) {
                return -1;
            }
            if (j2 >= this.buildRFWriters[i3].getFileSize() + SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(i2, this.buildPSizeInTups[i3], initialFrameSize)) {
                return i3;
            }
            nextSetBit = this.spilledStatus.nextSetBit(i3 + 1);
        }
    }

    private boolean loadSpilledPartitionToMem(int i, RunFileWriter runFileWriter) throws HyracksDataException {
        GeneratedRunFileReader createReader = runFileWriter.createReader();
        try {
            createReader.open();
            if (this.reloadBuffer == null) {
                this.reloadBuffer = new VSizeFrame(this.jobletCtx);
            }
            while (createReader.nextFrame(this.reloadBuffer)) {
                this.accessorBuild.reset(this.reloadBuffer.getBuffer());
                for (int i2 = 0; i2 < this.accessorBuild.getTupleCount(); i2++) {
                    if (!this.bufferManager.insertTuple(i, this.accessorBuild, i2, this.tempPtr)) {
                        this.bufferManager.clearPartition(i);
                        createReader.close();
                        return false;
                    }
                }
            }
            createReader.setDeleteAfterClose(true);
            createReader.close();
            this.spilledStatus.set(i, false);
            this.buildRFWriters[i] = null;
            return true;
        } catch (Throwable th) {
            createReader.close();
            throw th;
        }
    }

    private void buildHashTable() throws HyracksDataException {
        for (int i = 0; i < this.numOfPartitions; i++) {
            if (!this.spilledStatus.get(i)) {
                this.bufferManager.flushPartition(i, new IFrameWriter() { // from class: org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoin.1
                    public void open() {
                    }

                    public void nextFrame(ByteBuffer byteBuffer) throws HyracksDataException {
                        OptimizedHybridHashJoin.this.inMemJoiner.build(byteBuffer);
                    }

                    public void fail() {
                    }

                    public void close() {
                    }
                });
            }
        }
    }

    public void initProbe(ITuplePairComparator iTuplePairComparator) {
        this.probePSizeInTups = new int[this.numOfPartitions];
        this.inMemJoiner.setComparator(iTuplePairComparator);
        this.bufferManager.setConstrain(VPartitionTupleBufferManager.NO_CONSTRAIN);
    }

    public void probe(ByteBuffer byteBuffer, IFrameWriter iFrameWriter) throws HyracksDataException {
        this.accessorProbe.reset(byteBuffer);
        int tupleCount = this.accessorProbe.getTupleCount();
        this.inMemJoiner.resetAccessorProbe(this.accessorProbe);
        if (isBuildRelAllInMemory()) {
            for (int i = 0; i < tupleCount; i++) {
                if (this.probePredEval == null || this.probePredEval.evaluate(this.accessorProbe, i)) {
                    this.inMemJoiner.join(i, iFrameWriter);
                }
            }
            return;
        }
        for (int i2 = 0; i2 < tupleCount; i2++) {
            if (this.probePredEval == null || this.probePredEval.evaluate(this.accessorProbe, i2)) {
                int partition = this.probeHpc.partition(this.accessorProbe, i2, this.numOfPartitions);
                if (this.buildPSizeInTups[partition] > 0 || this.isLeftOuter) {
                    if (this.spilledStatus.get(partition)) {
                        processTupleProbePhase(i2, partition);
                    } else {
                        this.inMemJoiner.join(i2, iFrameWriter);
                    }
                    int[] iArr = this.probePSizeInTups;
                    iArr[partition] = iArr[partition] + 1;
                }
            }
        }
    }

    private void processTupleProbePhase(int i, int i2) throws HyracksDataException {
        if (this.bufferManager.insertTuple(i2, this.accessorProbe, i, this.tempPtr)) {
            return;
        }
        int calculateActualSize = VPartitionTupleBufferManager.calculateActualSize(null, this.accessorProbe.getTupleLength(i));
        int findSpilledPartitionWithMaxMemoryUsage = (!(calculateActualSize <= this.jobletCtx.getInitialFrameSize() / 2) || this.bufferManager.getNumTuples(i2) <= 0) ? this.spillPolicy.findSpilledPartitionWithMaxMemoryUsage() : i2;
        if (findSpilledPartitionWithMaxMemoryUsage < 0 || this.bufferManager.getPhysicalSize(findSpilledPartitionWithMaxMemoryUsage) < calculateActualSize) {
            flushBigObjectToDisk(i2, this.accessorProbe, i, this.probeRFWriters, this.probeRelName);
            return;
        }
        this.bufferManager.flushPartition(findSpilledPartitionWithMaxMemoryUsage, getSpillWriterOrCreateNewOneIfNotExist(this.probeRFWriters, this.probeRelName, findSpilledPartitionWithMaxMemoryUsage));
        this.bufferManager.clearPartition(findSpilledPartitionWithMaxMemoryUsage);
        if (this.bufferManager.insertTuple(i2, this.accessorProbe, i, this.tempPtr)) {
            return;
        }
        flushBigObjectToDisk(i2, this.accessorProbe, i, this.probeRFWriters, this.probeRelName);
    }

    private void flushBigObjectToDisk(int i, FrameTupleAccessor frameTupleAccessor, int i2, RunFileWriter[] runFileWriterArr, String str) throws HyracksDataException {
        if (this.bigFrameAppender == null) {
            this.bigFrameAppender = new FrameTupleAppender(new VSizeFrame(this.jobletCtx));
        }
        RunFileWriter spillWriterOrCreateNewOneIfNotExist = getSpillWriterOrCreateNewOneIfNotExist(runFileWriterArr, str, i);
        if (!this.bigFrameAppender.append(frameTupleAccessor, i2)) {
            throw new HyracksDataException("The given tuple is too big");
        }
        this.bigFrameAppender.write(spillWriterOrCreateNewOneIfNotExist, true);
    }

    private boolean isBuildRelAllInMemory() {
        return this.spilledStatus.nextSetBit(0) < 0;
    }

    public void completeProbe(IFrameWriter iFrameWriter) throws HyracksDataException {
        this.inMemJoiner.completeJoin(iFrameWriter);
    }

    public void releaseResource() throws HyracksDataException {
        this.inMemJoiner.closeTable();
        closeAllSpilledPartitions(this.probeRFWriters, this.probeRelName);
        this.bufferManager.close();
        this.inMemJoiner = null;
        this.bufferManager = null;
        this.bufferManagerForHashTable = null;
    }

    public RunFileReader getBuildRFReader(int i) throws HyracksDataException {
        if (this.buildRFWriters[i] == null) {
            return null;
        }
        return this.buildRFWriters[i].createDeleteOnCloseReader();
    }

    public int getBuildPartitionSizeInTup(int i) {
        return this.buildPSizeInTups[i];
    }

    public RunFileReader getProbeRFReader(int i) throws HyracksDataException {
        if (this.probeRFWriters[i] == null) {
            return null;
        }
        return this.probeRFWriters[i].createDeleteOnCloseReader();
    }

    public int getProbePartitionSizeInTup(int i) {
        return this.probePSizeInTups[i];
    }

    public int getMaxBuildPartitionSize() {
        return getMaxPartitionSize(this.buildPSizeInTups);
    }

    public int getMaxProbePartitionSize() {
        return getMaxPartitionSize(this.probePSizeInTups);
    }

    private int getMaxPartitionSize(int[] iArr) {
        int i = iArr[0];
        for (int i2 = 1; i2 < iArr.length; i2++) {
            if (iArr[i2] > i) {
                i = iArr[i2];
            }
        }
        return i;
    }

    public BitSet getPartitionStatus() {
        return this.spilledStatus;
    }

    public int getPartitionSize(int i) {
        return this.bufferManager.getPhysicalSize(i);
    }

    public void setIsReversed(boolean z) {
        if (z && (this.buildPredEval != null || this.probePredEval != null)) {
            throw new IllegalStateException();
        }
        this.isReversed = z;
    }

    private void logTupleInsertionFailure(int i, int i2, int i3, int i4) {
        LOGGER.debug("can't insert tuple in join memory. {}", String.format("partition %s, tuple size %s, needed # frames %s, partition frame limit %s, join memory in frames %s, initial frame size %s", Integer.valueOf(i2), Integer.valueOf(VPartitionTupleBufferManager.calculateActualSize(null, this.accessorBuild.getTupleLength(i))), Integer.valueOf(i3), Integer.valueOf(i4), Integer.valueOf(this.memSizeInFrames), Integer.valueOf(this.jobletCtx.getInitialFrameSize())));
        LOGGER.debug("partitions status:\n{}", this.spillPolicy.partitionsStatus());
    }
}
