package org.apache.kylin.provision;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.TimeZone;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.StreamingBatch;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.index.Slice;
import org.apache.kylin.invertedindex.index.SliceBuilder;
import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
import org.apache.kylin.invertedindex.model.IIRow;
import org.apache.kylin.job.DeployUtil;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.source.hive.HiveCmdBuilder;
import org.apache.kylin.source.hive.HiveTableReader;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.ii.IICreateHTableJob;
import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/provision/BuildIIWithStream.class */
public class BuildIIWithStream {
    private static final Logger logger = LoggerFactory.getLogger(BuildIIWithStream.class);
    private static final String[] II_NAME = {"test_kylin_ii_left_join", "test_kylin_ii_inner_join"};
    private IIManager iiManager;
    private KylinConfig kylinConfig;

    public static void main(String[] strArr) throws Exception {
        try {
            beforeClass();
            BuildIIWithStream buildIIWithStream = new BuildIIWithStream();
            buildIIWithStream.before();
            buildIIWithStream.build();
            logger.info("Build is done");
            afterClass();
            logger.info("Going to exit");
            System.exit(0);
        } catch (Exception e) {
            logger.error("error", e);
            System.exit(1);
        }
    }

    public static void beforeClass() throws Exception {
        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
        if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
            throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
        }
        HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
    }

    protected void deployEnv() throws Exception {
        DeployUtil.overrideJobJarLocations();
    }

    public void before() throws Exception {
        deployEnv();
        this.kylinConfig = KylinConfig.getInstanceFromEnv();
        this.iiManager = IIManager.getInstance(this.kylinConfig);
        for (String str : II_NAME) {
            IIInstance ii = this.iiManager.getII(str);
            if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
                ii.setStatus(RealizationStatusEnum.DISABLED);
                this.iiManager.updateII(ii);
            }
        }
    }

    public static void afterClass() throws Exception {
        cleanupOldStorage();
        HBaseMetadataTestCase.staticCleanupTestMetadata();
    }

    private String createIntermediateTable(IIDesc iIDesc, KylinConfig kylinConfig) throws IOException {
        IIJoinedFlatTableDesc iIJoinedFlatTableDesc = new IIJoinedFlatTableDesc(iIDesc);
        JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
        String uuid = UUID.randomUUID().toString();
        String str = "USE " + kylinConfig.getHiveDatabaseForIntermediateTable() + ";";
        String generateDropTableStatement = JoinedFlatTable.generateDropTableStatement(iIJoinedFlatTableDesc);
        String generateCreateTableStatement = JoinedFlatTable.generateCreateTableStatement(iIJoinedFlatTableDesc, JobBuilderSupport.getJobWorkingDir(jobEngineConfig, uuid));
        try {
            String generateInsertDataStatement = JoinedFlatTable.generateInsertDataStatement(iIJoinedFlatTableDesc, jobEngineConfig);
            ShellExecutable shellExecutable = new ShellExecutable();
            HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
            hiveCmdBuilder.addStatement(str);
            hiveCmdBuilder.addStatement(generateDropTableStatement);
            hiveCmdBuilder.addStatement(generateCreateTableStatement);
            hiveCmdBuilder.addStatement(generateInsertDataStatement);
            shellExecutable.setCmd(hiveCmdBuilder.build());
            logger.info(shellExecutable.getCmd());
            shellExecutable.setName("Create Intermediate Flat Hive Table");
            kylinConfig.getCliCommandExecutor().execute(shellExecutable.getCmd(), (org.apache.kylin.common.util.Logger) null);
            return iIJoinedFlatTableDesc.getTableName();
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException("Failed to generate insert data SQL for intermediate table.");
        }
    }

    private void clearSegment(String str) throws Exception {
        IIInstance ii = this.iiManager.getII(str);
        ii.getSegments().clear();
        this.iiManager.updateII(ii);
    }

    private IISegment createSegment(String str) throws Exception {
        clearSegment(str);
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
        simpleDateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
        return buildSegment(str, 0L, simpleDateFormat.parse("2015-01-01").getTime());
    }

    private IISegment buildSegment(String str, long j, long j2) throws Exception {
        IIInstance ii = this.iiManager.getII(str);
        IISegment buildSegment = this.iiManager.buildSegment(ii, j, j2);
        ii.getSegments().add(buildSegment);
        this.iiManager.updateII(ii);
        return buildSegment;
    }

    private void buildII(String str) throws Exception {
        IIDesc descriptor = this.iiManager.getII(str).getDescriptor();
        String createIntermediateTable = createIntermediateTable(descriptor, this.kylinConfig);
        logger.info("intermediate table name:" + createIntermediateTable);
        HiveTableReader hiveTableReader = new HiveTableReader("default", createIntermediateTable);
        for (TblColRef tblColRef : descriptor.listAllColumns()) {
            if (descriptor.isMetricsCol(tblColRef)) {
                logger.info("matrix:" + tblColRef.getName());
            } else {
                logger.info("measure:" + tblColRef.getName());
            }
        }
        IISegment createSegment = createSegment(str);
        HTableInterface table = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(createSegment.getStorageLocationIdentifier());
        ToolRunner.run(new IICreateHTableJob(), new String[]{"-iiname", str, "-htablename", createSegment.getStorageLocationIdentifier()});
        IIDesc iIDesc = createSegment.getIIDesc();
        SliceBuilder sliceBuilder = new SliceBuilder(descriptor, (short) 0);
        List<String[]> sortedRows = getSortedRows(hiveTableReader, descriptor.getTimestampColumn());
        int size = sortedRows.size();
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<String[]> it = sortedRows.iterator();
        while (it.hasNext()) {
            newArrayList.add(parse(it.next()));
            if (newArrayList.size() >= iIDesc.getSliceSize()) {
                build(sliceBuilder, new StreamingBatch(newArrayList, Pair.newPair(Long.valueOf(System.currentTimeMillis()), Long.valueOf(System.currentTimeMillis()))), table);
                newArrayList.clear();
            }
        }
        if (!newArrayList.isEmpty()) {
            build(sliceBuilder, new StreamingBatch(newArrayList, Pair.newPair(Long.valueOf(System.currentTimeMillis()), Long.valueOf(System.currentTimeMillis()))), table);
        }
        hiveTableReader.close();
        logger.info("total record count:" + size + " htable:" + createSegment.getStorageLocationIdentifier());
        logger.info("stream build finished, htable name:" + createSegment.getStorageLocationIdentifier());
    }

    public void build() throws Exception {
        for (String str : II_NAME) {
            buildII(str);
            IIInstance ii = this.iiManager.getII(str);
            if (ii.getStatus() != RealizationStatusEnum.READY) {
                ii.setStatus(RealizationStatusEnum.READY);
                this.iiManager.updateII(ii);
            }
        }
    }

    private void build(SliceBuilder sliceBuilder, StreamingBatch streamingBatch, HTableInterface hTableInterface) throws IOException {
        Slice buildSlice = sliceBuilder.buildSlice(streamingBatch);
        try {
            loadToHBase(hTableInterface, buildSlice, new IIKeyValueCodec(buildSlice.getInfo()));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void loadToHBase(HTableInterface hTableInterface, Slice slice, IIKeyValueCodec iIKeyValueCodec) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        for (IIRow iIRow : iIKeyValueCodec.encodeKeyValue(slice)) {
            byte[] bArr = iIRow.getKey().get();
            byte[] bArr2 = iIRow.getValue().get();
            Put put = new Put(bArr);
            put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, bArr2);
            ImmutableBytesWritable dictionary = iIRow.getDictionary();
            byte[] bArr3 = dictionary.get();
            if (dictionary.getOffset() != 0 || dictionary.getLength() != bArr3.length) {
                throw new RuntimeException("dict offset should be 0, and dict length should be " + bArr3.length + " but they are" + dictionary.getOffset() + " " + dictionary.getLength());
            }
            put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, bArr3);
            newArrayList.add(put);
        }
        hTableInterface.put(newArrayList);
    }

    private StreamingMessage parse(String[] strArr) {
        return new StreamingMessage(Lists.newArrayList(strArr), System.currentTimeMillis(), System.currentTimeMillis(), Collections.emptyMap());
    }

    private List<String[]> getSortedRows(HiveTableReader hiveTableReader, final int i) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        while (hiveTableReader.next()) {
            newArrayList.add(hiveTableReader.getRow());
        }
        Collections.sort(newArrayList, new Comparator<String[]>() { // from class: org.apache.kylin.provision.BuildIIWithStream.1
            @Override // java.util.Comparator
            public int compare(String[] strArr, String[] strArr2) {
                return Long.compare(DateFormat.stringToMillis(strArr[i]), DateFormat.stringToMillis(strArr2[i]));
            }
        });
        return newArrayList;
    }

    private static int cleanupOldStorage() throws Exception {
        return ToolRunner.run(new StorageCleanupJob(), new String[]{"--delete", "true"});
    }
}
