/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.mr.invertedindex;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.StreamingBatch;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.KylinReducer;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.index.Slice;
import org.apache.kylin.invertedindex.index.SliceBuilder;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
import org.apache.kylin.invertedindex.model.IIRow;

public class InvertedIndexReducer
extends KylinReducer<LongWritable, Object, ImmutableBytesWritable, ImmutableBytesWritable> {
    private TableRecordInfo info;
    private IIKeyValueCodec kv;
    private IMRInput.IMRTableInputFormat flatTableInputFormat;
    private SliceBuilder sliceBuilder;
    private ArrayList<StreamingMessage> messages;
    private int sliceSize;
    private ImmutableBytesWritable immutableBytesWritable;
    private ByteBuffer valueBuf;

    protected void setup(Reducer.Context context) throws IOException {
        super.bindCurrentConfiguration(context.getConfiguration());
        Configuration conf = context.getConfiguration();
        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
        IIManager mgr = IIManager.getInstance(config);
        IIInstance ii = mgr.getII(conf.get("ii.name"));
        IISegment seg = ii.getFirstSegment();
        this.info = new TableRecordInfo(seg);
        this.kv = new IIKeyValueCodec(this.info.getDigest());
        this.flatTableInputFormat = MRUtil.getBatchCubingInputSide(ii.getFirstSegment()).getFlatTableInputFormat();
        this.sliceSize = ii.getDescriptor().getSliceSize();
        short shard = (short)context.getTaskAttemptID().getTaskID().getId();
        System.out.println("Generating to shard - " + shard);
        this.sliceBuilder = new SliceBuilder(seg.getIIDesc(), shard);
        this.messages = Lists.newArrayListWithCapacity((int)this.sliceSize);
        this.immutableBytesWritable = new ImmutableBytesWritable();
        this.valueBuf = ByteBuffer.allocate(0x100000);
    }

    public void reduce(LongWritable key, Iterable<Object> values, Reducer.Context context) throws IOException, InterruptedException {
        for (Object v : values) {
            String[] row = this.flatTableInputFormat.parseMapperInput(v);
            this.messages.add(this.parse(row));
            if (this.messages.size() < this.sliceSize) continue;
            this.buildAndOutput(new StreamingBatch(this.messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), context);
            this.messages = Lists.newArrayList();
        }
    }

    private StreamingMessage parse(String[] row) {
        return new StreamingMessage(Lists.newArrayList((Object[])row), System.currentTimeMillis(), System.currentTimeMillis(), Collections.emptyMap());
    }

    protected void cleanup(Reducer.Context context) throws IOException, InterruptedException {
        if (!this.messages.isEmpty()) {
            this.buildAndOutput(new StreamingBatch(this.messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), context);
            this.messages.clear();
        }
    }

    private void buildAndOutput(StreamingBatch streamingBatch, Reducer.Context context) throws IOException, InterruptedException {
        Slice slice = this.sliceBuilder.buildSlice(streamingBatch);
        for (IIRow pair : this.kv.encodeKeyValue(slice)) {
            ImmutableBytesWritable value = pair.getValue();
            ImmutableBytesWritable dictionary = pair.getDictionary();
            int newLength = 4 + value.getLength() + dictionary.getLength();
            if (newLength > this.valueBuf.limit()) {
                this.valueBuf = ByteBuffer.allocate(newLength);
            }
            this.valueBuf.clear();
            this.valueBuf.putInt(value.getLength());
            this.valueBuf.put(value.get(), value.getOffset(), value.getLength());
            this.valueBuf.put(dictionary.get(), dictionary.getOffset(), dictionary.getLength());
            this.immutableBytesWritable.set(this.valueBuf.array(), 0, newLength);
            context.write((Object)pair.getKey(), (Object)this.immutableBytesWritable);
        }
    }
}

