package org.apache.kylin.engine.mr.steps;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil;
import org.apache.kylin.engine.mr.common.DictionaryGetterUtil;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-engine-mr-2.5.0.jar:org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.class */
public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T> extends KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) InMemCuboidMapperBase.class);
    private int reserveMemoryMB;
    protected CubeInstance cube;
    protected CubeDesc cubeDesc;
    protected CubeSegment cubeSegment;
    protected Map<TblColRef, Dictionary<String>> dictionaryMap;
    protected IJoinedFlatTableDesc flatDesc;
    protected int taskThreadCount;
    protected InputConverterUnit<T> inputConverterUnit;
    private Future<?> future;
    private int nSplit = 1;
    private int countOfLastSplit = 0;
    private int counter = 0;
    private int splitRowThreshold = Integer.MAX_VALUE;
    private int unitRows = 1000;
    protected BlockingQueue<T> queue = new LinkedBlockingQueue(2000);

    protected abstract InputConverterUnit<T> getInputConverterUnit(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context);

    protected abstract Future getCubingThreadFuture(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context, Map<TblColRef, Dictionary<String>> map, int i, CuboidScheduler cuboidScheduler);

    protected abstract T getRecordFromKeyValue(KEYIN keyin, VALUEIN valuein);

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kylin.engine.mr.KylinMapper
    public void doSetup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException {
        super.bindCurrentConfiguration(context.getConfiguration());
        Configuration configuration = context.getConfiguration();
        KylinConfig loadKylinPropsAndMetadata = AbstractHadoopJob.loadKylinPropsAndMetadata();
        this.cube = CubeManager.getInstance(loadKylinPropsAndMetadata).getCube(configuration.get(BatchConstants.CFG_CUBE_NAME));
        this.cubeDesc = this.cube.getDescriptor();
        this.cubeSegment = this.cube.getSegmentById(context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID));
        this.flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(this.cubeSegment), this.cubeDesc);
        this.dictionaryMap = DictionaryGetterUtil.getDictionaryMap(this.cubeSegment, context.getInputSplit(), configuration);
        if (this.cubeDesc.hasMemoryHungryMeasures()) {
            this.unitRows /= 10;
        }
        CuboidScheduler cuboidSchedulerByMode = CuboidSchedulerUtil.getCuboidSchedulerByMode(this.cubeSegment, configuration.get(BatchConstants.CFG_CUBOID_MODE));
        this.taskThreadCount = loadKylinPropsAndMetadata.getCubeAlgorithmInMemConcurrentThreads();
        this.reserveMemoryMB = calculateReserveMB(configuration);
        this.inputConverterUnit = getInputConverterUnit(context);
        this.future = getCubingThreadFuture(context, this.dictionaryMap, this.reserveMemoryMB, cuboidSchedulerByMode);
    }

    private int calculateReserveMB(Configuration configuration) {
        int systemAvailMB = MemoryBudgetController.getSystemAvailMB();
        int i = configuration.getInt("mapreduce.task.io.sort.mb", 100);
        int max = Math.max(systemAvailMB / 10, 100);
        int i2 = i + max;
        logger.info("Reserve " + i2 + " MB = " + i + " (MR reserve) + " + max + " (SYS reserve)");
        return i2;
    }

    @Override // org.apache.kylin.engine.mr.KylinMapper
    public void doMap(KEYIN keyin, VALUEIN valuein, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        if (!offer(context, getRecordFromKeyValue(keyin, valuein), 1L, TimeUnit.MINUTES, 60)) {
            throw new IOException("Failed to offer row to internal queue due to queue full!");
        }
        this.counter++;
        this.countOfLastSplit++;
        if (this.counter % 100000 == 0) {
            logger.info("Handled " + this.counter + " records, internal queue size = " + this.queue.size());
        }
        if (this.counter % this.unitRows == 0 && shouldCutSplit(this.nSplit, this.countOfLastSplit)) {
            if (!offer(context, this.inputConverterUnit.getCutRow(), 1L, TimeUnit.MINUTES, 60)) {
                throw new IOException("Failed to offer row to internal queue due to queue full!");
            }
            this.countOfLastSplit = 0;
            this.nSplit++;
        }
    }

    @Override // org.apache.kylin.engine.mr.KylinMapper
    protected void doCleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        logger.info("Totally handled " + this.mapCounter + " records!");
        while (!this.future.isDone() && !this.queue.offer(this.inputConverterUnit.getEndRow(), 1L, TimeUnit.SECONDS)) {
        }
        futureGet(context);
        this.queue.clear();
    }

    private boolean shouldCutSplit(int i, long j) {
        int systemAvailMB = MemoryBudgetController.getSystemAvailMB();
        logger.info(j + " records went into split #" + i + "; " + systemAvailMB + " MB left, " + this.reserveMemoryMB + " MB threshold");
        if (j >= this.splitRowThreshold) {
            logger.info("Split cut due to hitting splitRowThreshold " + this.splitRowThreshold);
            return true;
        }
        if (systemAvailMB > this.reserveMemoryMB) {
            return false;
        }
        logger.info("Split cut due to hitting memory threshold, system avail " + systemAvailMB + " MB <= reserve " + this.reserveMemoryMB + " MB");
        return true;
    }

    private boolean offer(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context, T t, long j, TimeUnit timeUnit, int i) throws IOException, InterruptedException {
        while (i > 0) {
            if (this.queue.offer(t, j, timeUnit)) {
                return true;
            }
            if (this.future.isDone()) {
                futureGet(context);
                throw new IOException("Failed to build cube in mapper due to cubing thread exit unexpectedly");
            }
            i--;
        }
        return false;
    }

    private void futureGet(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException {
        try {
            this.future.get();
        } catch (Exception e) {
            throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
        }
    }
}
