package org.apache.kylin.provision;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Map;
import java.util.Random;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.job.DeployUtil;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.streaming.Kafka10DataLoader;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.streaming.StreamingManager;
import org.apache.kylin.rest.job.StorageCleanupJob;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.source.SourcePartition;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.BrokerConfig;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
import org.apache.kylin.storage.hbase.util.ZookeeperUtil;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/provision/BuildCubeWithStream.class */
public class BuildCubeWithStream {
    protected CubeManager cubeManager;
    private DefaultScheduler scheduler;
    protected ExecutableManager jobService;
    static final String cubeName = "test_streaming_table_cube";
    private KafkaConfig kafkaConfig;
    private MockKafka kafkaServer;
    private ZkConnection zkConnection;
    private final String kafkaZkPath = "/kylin/streaming/" + UUID.randomUUID().toString();
    private volatile boolean generateData = true;
    private volatile boolean generateDataDone = false;
    private static final int BUILD_ROUND = 5;
    private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream.class);
    protected static boolean fastBuildMode = false;

    public void before() throws Exception {
        deployEnv();
        String property = System.getProperty("fastBuildMode");
        if (property == null || !property.equalsIgnoreCase("true")) {
            logger.info("Will not use fast build mode");
        } else {
            fastBuildMode = true;
            logger.info("Will use fast build mode");
        }
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        this.jobService = ExecutableManager.getInstance(instanceFromEnv);
        this.scheduler = DefaultScheduler.createInstance();
        this.scheduler.init(new JobEngineConfig(instanceFromEnv), new ZookeeperJobLock());
        if (!this.scheduler.hasStarted()) {
            throw new RuntimeException("scheduler has not been started");
        }
        this.cubeManager = CubeManager.getInstance(instanceFromEnv);
        this.kafkaConfig = KafkaConfigManager.getInstance(instanceFromEnv).getKafkaConfig(StreamingManager.getInstance(instanceFromEnv).getStreamingConfig(CubeManager.getInstance(instanceFromEnv).getCube(cubeName).getRootFactTable()).getName());
        String uuid = UUID.randomUUID().toString();
        String localIp = NetworkUtils.getLocalIp();
        BrokerConfig brokerConfig = (BrokerConfig) ((KafkaClusterConfig) this.kafkaConfig.getKafkaClusterConfigs().get(0)).getBrokerConfigs().get(0);
        brokerConfig.setHost(localIp);
        this.kafkaConfig.setTopic(uuid);
        KafkaConfigManager.getInstance(instanceFromEnv).updateKafkaConfig(this.kafkaConfig);
        startEmbeddedKafka(uuid, brokerConfig);
    }

    private void startEmbeddedKafka(String str, BrokerConfig brokerConfig) {
        String str2 = ZookeeperUtil.getZKConnectString() + this.kafkaZkPath;
        System.out.println("zkConnectionStr" + str2);
        this.zkConnection = new ZkConnection(str2);
        this.kafkaServer = new MockKafka(this.zkConnection, brokerConfig.getPort(), brokerConfig.getId());
        this.kafkaServer.start();
        this.kafkaServer.createTopic(str, 3, 1);
        this.kafkaServer.waitTopicUntilReady(str);
        Assert.assertEquals(str, this.kafkaServer.fetchTopicMeta(str).topic());
    }

    protected void generateStreamData(long j, long j2, int i) throws IOException {
        DeployUtil.prepareTestDataForStreamingCube(j, j2, i, cubeName, new Kafka10DataLoader(this.kafkaConfig));
        logger.info("Test data inserted into Kafka");
    }

    protected void clearSegment(String str) throws Exception {
        CubeInstance cube = this.cubeManager.getCube(str);
        this.cubeManager.updateCubeDropSegments(cube, cube.getSegments());
    }

    public void build() throws Exception {
        clearSegment(cubeName);
        new Thread(new Runnable() { // from class: org.apache.kylin.provision.BuildCubeWithStream.1
            @Override // java.lang.Runnable
            public void run() {
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                simpleDateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
                long j = 0;
                try {
                    j = simpleDateFormat.parse("2012-01-01").getTime();
                } catch (ParseException e) {
                }
                Random random = new Random();
                while (BuildCubeWithStream.this.generateData) {
                    long j2 = j + 604800000;
                    try {
                        BuildCubeWithStream.this.generateStreamData(j, j2, random.nextInt(100));
                        j = j2;
                        Thread.sleep(random.nextInt(random.nextInt(30)) * 1000);
                    } catch (Exception e2) {
                        e2.printStackTrace();
                    }
                }
                BuildCubeWithStream.this.generateDataDone = true;
            }
        }).start();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < BUILD_ROUND; i++) {
            if (i == 4) {
                this.generateData = false;
                for (int i2 = 0; !this.generateDataDone && i2 < 100; i2++) {
                    Thread.sleep(1000L);
                }
                if (!this.generateDataDone) {
                    throw new IllegalStateException("Timeout when wait all messages be sent to Kafka");
                }
            } else {
                Thread.sleep(30000L);
            }
            FutureTask futureTask = new FutureTask(new Callable<ExecutableState>() { // from class: org.apache.kylin.provision.BuildCubeWithStream.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public ExecutableState call() {
                    ExecutableState executableState = null;
                    try {
                        executableState = BuildCubeWithStream.this.buildSegment(BuildCubeWithStream.cubeName, 0L, Long.MAX_VALUE);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return executableState;
                }
            });
            newCachedThreadPool.submit(futureTask);
            newArrayList.add(futureTask);
        }
        this.generateData = false;
        newCachedThreadPool.shutdown();
        int i3 = 0;
        for (int i4 = 0; i4 < newArrayList.size(); i4++) {
            ExecutableState executableState = (ExecutableState) ((FutureTask) newArrayList.get(i4)).get(20L, TimeUnit.MINUTES);
            logger.info("Checking building task " + i4 + " whose state is " + executableState);
            Assert.assertTrue(executableState == null || executableState == ExecutableState.SUCCEED || executableState == ExecutableState.DISCARDED);
            if (executableState == ExecutableState.SUCCEED) {
                i3++;
            }
        }
        logger.info(i3 + " build jobs have been successfully completed.");
        Segments segments = this.cubeManager.getCube(cubeName).getSegments(SegmentStatusEnum.READY);
        Assert.assertTrue(segments.size() == i3);
        if (!fastBuildMode) {
            Assert.assertTrue(mergeSegment(cubeName, new SegmentRange(0L, Long.valueOf(((Long) ((CubeSegment) segments.get(segments.size() - 1)).getSegRange().end.v).longValue()))) == ExecutableState.SUCCEED);
            Segments segments2 = this.cubeManager.getCube(cubeName).getSegments();
            Assert.assertTrue(segments2.size() == 1);
            Assert.assertTrue(((CubeSegment) segments2.get(0)).getTSRange().duration() > 0);
            refreshSegment(cubeName, ((CubeSegment) segments2.get(0)).getSegRange());
            Assert.assertTrue(this.cubeManager.getCube(cubeName).getSegments().size() == 1);
        }
        logger.info("Build is done");
    }

    private ExecutableState mergeSegment(String str, SegmentRange segmentRange) throws Exception {
        DefaultChainedExecutable createBatchMergeJob = EngineFactory.createBatchMergeJob(this.cubeManager.mergeSegments(this.cubeManager.getCube(str), (SegmentRange.TSRange) null, segmentRange, false), "TEST");
        this.jobService.addJob(createBatchMergeJob);
        waitForJob(createBatchMergeJob.getId());
        return createBatchMergeJob.getStatus();
    }

    private String refreshSegment(String str, SegmentRange segmentRange) throws Exception {
        DefaultChainedExecutable createBatchCubingJob = EngineFactory.createBatchCubingJob(this.cubeManager.refreshSegment(this.cubeManager.getCube(str), (SegmentRange.TSRange) null, segmentRange), "TEST");
        this.jobService.addJob(createBatchCubingJob);
        waitForJob(createBatchCubingJob.getId());
        return createBatchCubingJob.getId();
    }

    protected ExecutableState buildSegment(String str, long j, long j2) throws Exception {
        CubeInstance cube = this.cubeManager.getCube(str);
        DefaultChainedExecutable createBatchCubingJob = EngineFactory.createBatchCubingJob(this.cubeManager.appendSegment(this.cubeManager.getCube(str), SourceManager.getSource(cube).enrichSourcePartitionBeforeBuild(cube, new SourcePartition((SegmentRange.TSRange) null, new SegmentRange(Long.valueOf(j), Long.valueOf(j2)), (Map) null, (Map) null))), "TEST");
        this.jobService.addJob(createBatchCubingJob);
        waitForJob(createBatchCubingJob.getId());
        return createBatchCubingJob.getStatus();
    }

    protected void deployEnv() throws IOException {
        DeployUtil.overrideJobJarLocations();
    }

    public static void beforeClass() throws Exception {
        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
        System.setProperty("KYLIN_CONF", HBaseMetadataTestCase.SANDBOX_TEST_DATA);
        if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
            throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.4.0.0-169");
        }
        HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
    }

    public void after() {
        this.kafkaServer.stop();
        cleanKafkaZkPath(this.kafkaZkPath);
        DefaultScheduler.destroyInstance();
    }

    private void cleanKafkaZkPath(String str) {
        CuratorFramework newClient = CuratorFrameworkFactory.newClient(ZookeeperUtil.getZKConnectString(), new ExponentialBackoffRetry(1000, 3));
        newClient.start();
        try {
            try {
                newClient.delete().deletingChildrenIfNeeded().forPath(this.kafkaZkPath);
                newClient.close();
            } catch (Exception e) {
                logger.warn("Failed to delete zookeeper path: " + str, e);
                newClient.close();
            }
        } catch (Throwable th) {
            newClient.close();
            throw th;
        }
    }

    protected void waitForJob(String str) {
        while (true) {
            AbstractExecutable job = this.jobService.getJob(str);
            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR || job.getStatus() == ExecutableState.DISCARDED) {
                return;
            }
            try {
                Thread.sleep(5000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void cleanup() throws Exception {
        cleanupOldStorage();
        HBaseMetadataTestCase.staticCleanupTestMetadata();
    }

    protected void cleanupOldStorage() throws Exception {
        new StorageCleanupJob().execute(new String[]{"--delete", "true"});
    }

    public static void main(String[] strArr) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        BuildCubeWithStream buildCubeWithStream = null;
        try {
            try {
                beforeClass();
                buildCubeWithStream = new BuildCubeWithStream();
                buildCubeWithStream.before();
                buildCubeWithStream.build();
                logger.info("Build is done");
                if (buildCubeWithStream != null) {
                    buildCubeWithStream.after();
                    buildCubeWithStream.cleanup();
                }
                logger.info("Going to exit");
            } catch (Throwable th) {
                logger.error("error", th);
                i = 1;
                if (buildCubeWithStream != null) {
                    buildCubeWithStream.after();
                    buildCubeWithStream.cleanup();
                }
                logger.info("Going to exit");
            }
            System.out.println("Time elapsed: " + ((System.currentTimeMillis() - currentTimeMillis) / 1000) + " sec - in " + BuildCubeWithStream.class.getName());
            System.exit(i);
        } catch (Throwable th2) {
            if (buildCubeWithStream != null) {
                buildCubeWithStream.after();
                buildCubeWithStream.cleanup();
            }
            logger.info("Going to exit");
            throw th2;
        }
    }
}
