/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.mr.steps;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dimension.Dictionary;
import org.apache.kylin.engine.mr.ByteArrayWritable;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.steps.MapContextGTRecordWriter;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;

public class InMemCuboidMapper<KEYIN>
extends KylinMapper<KEYIN, Object, ByteArrayWritable, ByteArrayWritable> {
    private static final Log logger = LogFactory.getLog(InMemCuboidMapper.class);
    private CubeInstance cube;
    private CubeDesc cubeDesc;
    private CubeSegment cubeSegment;
    private IMRInput.IMRTableInputFormat flatTableInputFormat;
    private int counter;
    private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(64);
    private Future<?> future;

    protected void setup(Mapper.Context context) throws IOException {
        super.bindCurrentConfiguration(context.getConfiguration());
        Configuration conf = context.getConfiguration();
        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
        String cubeName = conf.get("cube.name");
        this.cube = CubeManager.getInstance(config).getCube(cubeName);
        this.cubeDesc = this.cube.getDescriptor();
        String segmentName = context.getConfiguration().get("cube.segment.name");
        this.cubeSegment = this.cube.getSegment(segmentName, SegmentStatusEnum.NEW);
        this.flatTableInputFormat = MRUtil.getBatchCubingInputSide(this.cubeSegment).getFlatTableInputFormat();
        HashMap dictionaryMap = Maps.newHashMap();
        for (TblColRef col : this.cubeDesc.getAllColumnsNeedDictionary()) {
            Dictionary<String> dict = this.cubeSegment.getDictionary(col);
            if (dict == null) {
                logger.warn((Object)("Dictionary for " + col + " was not found."));
            }
            dictionaryMap.put(col, this.cubeSegment.getDictionary(col));
        }
        DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(this.cube.getDescriptor(), dictionaryMap);
        cubeBuilder.setReserveMemoryMB(this.calculateReserveMB(context.getConfiguration()));
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        this.future = executorService.submit(cubeBuilder.buildAsRunnable(this.queue, new MapContextGTRecordWriter((MapContext<?, ?, ByteArrayWritable, ByteArrayWritable>)context, this.cubeDesc, this.cubeSegment)));
    }

    private int calculateReserveMB(Configuration configuration) {
        int sysAvailMB = MemoryBudgetController.getSystemAvailMB();
        int mrReserve = configuration.getInt("mapreduce.task.io.sort.mb", 100);
        int sysReserve = Math.max(sysAvailMB / 10, 100);
        int reserveMB = mrReserve + sysReserve;
        logger.info((Object)("Reserve " + reserveMB + " MB = " + mrReserve + " (MR reserve) + " + sysReserve + " (SYS reserve)"));
        return reserveMB;
    }

    public void map(KEYIN key, Object record, Mapper.Context context) throws IOException, InterruptedException {
        String[] row = this.flatTableInputFormat.parseMapperInput(record);
        List<String> rowAsList = Arrays.asList(row);
        while (!this.future.isDone()) {
            if (!this.queue.offer(rowAsList, 1L, TimeUnit.SECONDS)) continue;
            ++this.counter;
            if (this.counter % 100000 != 0) break;
            logger.info((Object)("Handled " + this.counter + " records!"));
            break;
        }
    }

    protected void cleanup(Mapper.Context context) throws IOException, InterruptedException {
        logger.info((Object)("Totally handled " + this.counter + " records!"));
        while (!this.future.isDone() && !this.queue.offer(Collections.emptyList(), 1L, TimeUnit.SECONDS)) {
        }
        try {
            this.future.get();
        }
        catch (Exception e) {
            throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
        }
        this.queue.clear();
    }
}

