package org.apache.kylin.engine.streaming.cube;

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.StreamingBatch;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder;
import org.apache.kylin.cube.util.CubingUtils;
import org.apache.kylin.dimension.Dictionary;
import org.apache.kylin.engine.streaming.StreamingBatchBuilder;
import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-engine-streaming-1.5.2.jar:org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.class */
public class StreamingCubeBuilder implements StreamingBatchBuilder {
    private static final Logger logger = LoggerFactory.getLogger(StreamingCubeBuilder.class);
    private final String cubeName;
    private int processedRowCount = 0;

    public StreamingCubeBuilder(String str) {
        this.cubeName = str;
    }

    @Override // org.apache.kylin.engine.streaming.StreamingBatchBuilder
    public void build(StreamingBatch streamingBatch, Map<TblColRef, Dictionary<String>> map, ICuboidWriter iCuboidWriter) {
        try {
            try {
                try {
                    CubeInstance reloadCubeLocal = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(this.cubeName);
                    LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
                    Future<?> submit = Executors.newCachedThreadPool().submit(new InMemCubeBuilder(reloadCubeLocal.getDescriptor(), map).buildAsRunnable(linkedBlockingQueue, iCuboidWriter));
                    this.processedRowCount = streamingBatch.getMessages().size();
                    Iterator<StreamingMessage> it2 = streamingBatch.getMessages().iterator();
                    while (it2.hasNext()) {
                        linkedBlockingQueue.put(it2.next().getData());
                    }
                    linkedBlockingQueue.put(Collections.emptyList());
                    submit.get();
                    iCuboidWriter.flush();
                    try {
                        iCuboidWriter.close();
                    } catch (IOException e) {
                        throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
                    }
                } catch (Throwable th) {
                    try {
                        iCuboidWriter.close();
                        throw th;
                    } catch (IOException e2) {
                        throw new RuntimeException("error build cube from StreamingBatch", e2.getCause());
                    }
                }
            } catch (InterruptedException e3) {
                throw new RuntimeException(e3);
            }
        } catch (IOException e4) {
            throw new RuntimeException("error build cube from StreamingBatch", e4.getCause());
        } catch (ExecutionException e5) {
            throw new RuntimeException("error build cube from StreamingBatch", e5.getCause());
        }
    }

    @Override // org.apache.kylin.engine.streaming.StreamingBatchBuilder
    public IBuildable createBuildable(StreamingBatch streamingBatch) {
        CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
        try {
            CubeSegment appendSegments = cubeManager.appendSegments(cubeManager.reloadCubeLocal(this.cubeName), streamingBatch.getTimeRange().getFirst().longValue(), streamingBatch.getTimeRange().getSecond().longValue(), false, false);
            appendSegments.setLastBuildJobID(appendSegments.getUuid());
            appendSegments.setInputRecords(streamingBatch.getMessages().size());
            appendSegments.setLastBuildTime(System.currentTimeMillis());
            return appendSegments;
        } catch (IOException e) {
            throw new RuntimeException("failed to create IBuildable", e);
        }
    }

    @Override // org.apache.kylin.engine.streaming.StreamingBatchBuilder
    public Map<Long, HyperLogLogPlusCounter> sampling(StreamingBatch streamingBatch) {
        CubeInstance reloadCubeLocal = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(this.cubeName);
        long currentTimeMillis = System.currentTimeMillis();
        Map<Long, HyperLogLogPlusCounter> sampling = CubingUtils.sampling(reloadCubeLocal.getDescriptor(), Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() { // from class: org.apache.kylin.engine.streaming.cube.StreamingCubeBuilder.1
            @Override // com.google.common.base.Function
            @Nullable
            public List<String> apply(@Nullable StreamingMessage streamingMessage) {
                return streamingMessage.getData();
            }
        }));
        logger.info(String.format("sampling of %d messages cost %d ms", Integer.valueOf(streamingBatch.getMessages().size()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)));
        return sampling;
    }

    @Override // org.apache.kylin.engine.streaming.StreamingBatchBuilder
    public Map<TblColRef, Dictionary<String>> buildDictionary(StreamingBatch streamingBatch, IBuildable iBuildable) {
        try {
            return CubingUtils.writeDictionary((CubeSegment) iBuildable, CubingUtils.buildDictionary(CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(this.cubeName), Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() { // from class: org.apache.kylin.engine.streaming.cube.StreamingCubeBuilder.2
                @Override // com.google.common.base.Function
                @Nullable
                public List<String> apply(@Nullable StreamingMessage streamingMessage) {
                    return streamingMessage.getData();
                }
            })), streamingBatch.getTimeRange().getFirst().longValue(), streamingBatch.getTimeRange().getSecond().longValue());
        } catch (IOException e) {
            throw new RuntimeException("failed to build dictionary", e);
        }
    }

    @Override // org.apache.kylin.engine.streaming.StreamingBatchBuilder
    public void commit(IBuildable iBuildable) {
        CubeSegment cubeSegment = (CubeSegment) iBuildable;
        cubeSegment.setStatus(SegmentStatusEnum.READY);
        cubeSegment.setInputRecords(this.processedRowCount);
        CubeUpdate cubeUpdate = new CubeUpdate(cubeSegment.getCubeInstance());
        cubeUpdate.setToAddSegs(cubeSegment);
        try {
            CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).updateCube(cubeUpdate);
        } catch (IOException e) {
            throw new RuntimeException("failed to update CubeSegment", e);
        }
    }
}
