/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.provision;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Locale;
import java.util.Random;
import java.util.TimeZone;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.common.util.ZKUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.job.DeployUtil;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.lock.JobLock;
import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock;
import org.apache.kylin.job.streaming.Kafka10DataLoader;
import org.apache.kylin.job.streaming.StreamDataLoader;
import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.metadata.streaming.StreamingManager;
import org.apache.kylin.provision.MockKafka;
import org.apache.kylin.rest.job.StorageCleanupJob;
import org.apache.kylin.source.ISource;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.source.SourcePartition;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.BrokerConfig;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BuildCubeWithStream {
    private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream.class);
    protected CubeManager cubeManager;
    private DefaultScheduler scheduler;
    protected ExecutableManager jobService;
    static final String cubeName = "test_streaming_table_cube";
    static final String joinTableCubeName = "test_streaming_join_table_cube";
    private KafkaConfig kafkaConfig;
    private MockKafka kafkaServer;
    private ZkConnection zkConnection;
    private final String kafkaZkPath = ZKUtil.getZkRootBasedPath((String)"streaming") + "/" + RandomUtil.randomUUID().toString();
    protected static boolean fastBuildMode = false;
    private volatile boolean generateData = true;
    private volatile boolean generateDataDone = false;
    private static final int BUILD_ROUND = 4;

    public void before() throws Exception {
        this.deployEnv();
        String fastModeStr = System.getProperty("fastBuildMode");
        if (fastModeStr != null && fastModeStr.equalsIgnoreCase("true")) {
            fastBuildMode = true;
            logger.info("Will use fast build mode");
        } else {
            logger.info("Will not use fast build mode");
        }
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        this.jobService = ExecutableManager.getInstance((KylinConfig)kylinConfig);
        this.scheduler = DefaultScheduler.createInstance();
        this.scheduler.init(new JobEngineConfig(kylinConfig), (JobLock)new ZookeeperJobLock());
        if (!this.scheduler.hasStarted()) {
            throw new RuntimeException("scheduler has not been started");
        }
        this.cubeManager = CubeManager.getInstance((KylinConfig)kylinConfig);
        CubeInstance cubeInstance = CubeManager.getInstance((KylinConfig)kylinConfig).getCube(cubeName);
        String factTable = cubeInstance.getRootFactTable();
        StreamingManager streamingManager = StreamingManager.getInstance((KylinConfig)kylinConfig);
        StreamingConfig streamingConfig = streamingManager.getStreamingConfig(factTable);
        this.kafkaConfig = KafkaConfigManager.getInstance((KylinConfig)kylinConfig).getKafkaConfig(streamingConfig.getName());
        String topicName = RandomUtil.randomUUID().toString();
        BrokerConfig brokerConfig = (BrokerConfig)((KafkaClusterConfig)this.kafkaConfig.getKafkaClusterConfigs().get(0)).getBrokerConfigs().get(0);
        this.kafkaConfig.setTopic(topicName);
        KafkaConfigManager.getInstance((KylinConfig)kylinConfig).updateKafkaConfig(this.kafkaConfig);
        this.startEmbeddedKafka(topicName, brokerConfig);
    }

    private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) {
        String zkConnectionStr = ZKUtil.getZKConnectString((KylinConfig)KylinConfig.getInstanceFromEnv()) + this.kafkaZkPath;
        System.out.println("zkConnectionStr" + zkConnectionStr);
        this.zkConnection = new ZkConnection(zkConnectionStr);
        this.kafkaServer = new MockKafka(this.zkConnection, brokerConfig.getHost() + ":" + brokerConfig.getPort(), brokerConfig.getId());
        this.kafkaServer.start();
        this.kafkaServer.createTopic(topicName, 3, 1);
        this.kafkaServer.waitTopicUntilReady(topicName);
        MetadataResponse.TopicMetadata topicMetadata = this.kafkaServer.fetchTopicMeta(topicName);
        Assert.assertEquals((Object)topicName, (Object)topicMetadata.topic());
    }

    protected void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException {
        Kafka10DataLoader dataLoader = new Kafka10DataLoader(this.kafkaConfig);
        DeployUtil.prepareTestDataForStreamingCube((long)startTime, (long)endTime, (int)numberOfRecords, (String)cubeName, (StreamDataLoader)dataLoader);
        logger.info("Test data inserted into Kafka");
    }

    protected void clearSegment(String cubeName) throws Exception {
        CubeInstance cube = this.cubeManager.getCube(cubeName);
        this.cubeManager.updateCubeDropSegments(cube, (Collection)cube.getSegments());
    }

    public void build() throws Exception {
        this.clearSegment(cubeName);
        this.clearSegment(joinTableCubeName);
        new Thread(new Runnable(){

            @Override
            public void run() {
                SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd", Locale.ROOT);
                f.setTimeZone(TimeZone.getTimeZone("GMT"));
                long dateStart = 0L;
                try {
                    dateStart = f.parse("2012-01-01").getTime();
                }
                catch (ParseException parseException) {
                    // empty catch block
                }
                Random rand = new Random();
                while (BuildCubeWithStream.this.generateData) {
                    long dateEnd = dateStart + 604800000L;
                    try {
                        BuildCubeWithStream.this.generateStreamData(dateStart, dateEnd, rand.nextInt(100));
                        dateStart = dateEnd;
                        Thread.sleep(rand.nextInt(rand.nextInt(30)) * 1000);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                BuildCubeWithStream.this.generateDataDone = true;
            }
        }).start();
        ExecutorService executorService = Executors.newCachedThreadPool();
        ArrayList futures = Lists.newArrayList();
        for (int i = 0; i < 4; ++i) {
            if (i == 3) {
                this.generateData = false;
                for (int waittime = 0; !this.generateDataDone && waittime < 100; ++waittime) {
                    Thread.sleep(1000L);
                }
                if (!this.generateDataDone) {
                    throw new IllegalStateException("Timeout when wait all messages be sent to Kafka");
                }
            } else {
                Thread.sleep(30000L);
            }
            FutureTask<ExecutableState> futureTask = new FutureTask<ExecutableState>(new StreamOffsetCallable(cubeName, 0L, Long.MAX_VALUE));
            executorService.submit(futureTask);
            futures.add(futureTask);
        }
        Thread.sleep(30000L);
        CubeInstance cube = this.cubeManager.getCube(cubeName);
        CubeSegment firstSeg = cube.getFirstSegment();
        CubeSegment lastSeg = cube.getLastSegment();
        SourcePartition sourcePartition = new SourcePartition(null, new SegmentRange(firstSeg.getSegRange().start, lastSeg.getSegRange().end), firstSeg.getSourcePartitionOffsetStart(), lastSeg.getSourcePartitionOffsetEnd());
        FutureTask<ExecutableState> futureTask = new FutureTask<ExecutableState>(new StreamSourcePartitionCallable(joinTableCubeName, sourcePartition));
        executorService.submit(futureTask);
        futures.add(futureTask);
        this.generateData = false;
        executorService.shutdown();
        int succeedBuild = 0;
        for (int i = 0; i < futures.size(); ++i) {
            ExecutableState result = (ExecutableState)((FutureTask)futures.get(i)).get(20L, TimeUnit.MINUTES);
            logger.info("Checking building task " + i + " whose state is " + result);
            Assert.assertTrue((result == null || result == ExecutableState.SUCCEED || result == ExecutableState.DISCARDED ? 1 : 0) != 0);
            if (result != ExecutableState.SUCCEED) continue;
            ++succeedBuild;
        }
        logger.info(succeedBuild + " build jobs have been successfully completed.");
        Segments segments = this.cubeManager.getCube(cubeName).getSegments(SegmentStatusEnum.READY);
        Segments joinTableSegments = this.cubeManager.getCube(joinTableCubeName).getSegments(SegmentStatusEnum.READY);
        Assert.assertTrue((segments.size() + joinTableSegments.size() == succeedBuild ? 1 : 0) != 0);
        if (!fastBuildMode) {
            long endOffset = (Long)((CubeSegment)segments.get((int)(segments.size() - 1))).getSegRange().end.v;
            ExecutableState result = this.mergeSegment(cubeName, new SegmentRange((Comparable)Long.valueOf(0L), (Comparable)Long.valueOf(endOffset)));
            Assert.assertTrue((result == ExecutableState.SUCCEED ? 1 : 0) != 0);
            segments = this.cubeManager.getCube(cubeName).getSegments();
            Assert.assertTrue((segments.size() == 1 ? 1 : 0) != 0);
            SegmentRange.TSRange tsRange = ((CubeSegment)segments.get(0)).getTSRange();
            Assert.assertTrue((tsRange.duration() > 0L ? 1 : 0) != 0);
            CubeSegment toRefreshSeg = (CubeSegment)segments.get(0);
            this.refreshSegment(cubeName, toRefreshSeg.getSegRange());
            segments = this.cubeManager.getCube(cubeName).getSegments();
            Assert.assertTrue((segments.size() == 1 ? 1 : 0) != 0);
        }
        logger.info("Build is done");
    }

    private ExecutableState mergeSegment(String cubeName, SegmentRange segRange) throws Exception {
        CubeSegment segment = this.cubeManager.mergeSegments(this.cubeManager.getCube(cubeName), null, segRange, false);
        DefaultChainedExecutable job = EngineFactory.createBatchMergeJob((CubeSegment)segment, (String)"TEST");
        this.jobService.addJob((AbstractExecutable)job);
        this.waitForJob(job.getId());
        return job.getStatus();
    }

    private String refreshSegment(String cubeName, SegmentRange segRange) throws Exception {
        CubeSegment segment = this.cubeManager.refreshSegment(this.cubeManager.getCube(cubeName), null, segRange);
        DefaultChainedExecutable job = EngineFactory.createBatchCubingJob((CubeSegment)segment, (String)"TEST");
        this.jobService.addJob((AbstractExecutable)job);
        this.waitForJob(job.getId());
        return job.getId();
    }

    protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
        CubeInstance cubeInstance = this.cubeManager.getCube(cubeName);
        ISource source = SourceManager.getSource((ISourceAware)cubeInstance);
        SourcePartition partition = source.enrichSourcePartitionBeforeBuild((IBuildable)cubeInstance, new SourcePartition(null, new SegmentRange((Comparable)Long.valueOf(startOffset), (Comparable)Long.valueOf(endOffset)), null, null));
        return this.buildSegment(cubeName, partition);
    }

    protected ExecutableState buildSegment(String cubeName, SourcePartition partition) throws Exception {
        logger.info("SourcePartition: {}", (Object)partition.toString());
        CubeSegment segment = this.cubeManager.appendSegment(this.cubeManager.getCube(cubeName), partition);
        DefaultChainedExecutable job = EngineFactory.createBatchCubingJob((CubeSegment)segment, (String)"TEST");
        this.jobService.addJob((AbstractExecutable)job);
        this.waitForJob(job.getId());
        return job.getStatus();
    }

    protected void deployEnv() throws Exception {
        DeployUtil.overrideJobJarLocations();
        DeployUtil.deployTablesInModelWithExclusiveTables((String)"test_streaming_join_table_model", (String[])new String[]{"DEFAULT.STREAMING_TABLE"});
    }

    public static void beforeClass() throws Exception {
        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
        ClassUtil.addClasspath((String)new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
        System.setProperty("KYLIN_CONF", HBaseMetadataTestCase.SANDBOX_TEST_DATA);
        if (StringUtils.isEmpty((CharSequence)System.getProperty("hdp.version"))) {
            throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.4.0.0-169");
        }
        HBaseMetadataTestCase.staticCreateTestMetadata((String)HBaseMetadataTestCase.SANDBOX_TEST_DATA);
    }

    public void after() {
        if (this.kafkaServer != null) {
            this.kafkaServer.stop();
        }
        ZKUtil.cleanZkPath((String)this.kafkaZkPath);
        DefaultScheduler.destroyInstance();
    }

    protected void waitForJob(String jobId) {
        AbstractExecutable job;
        while ((job = this.jobService.getJob(jobId)).getStatus() != ExecutableState.SUCCEED && job.getStatus() != ExecutableState.ERROR && job.getStatus() != ExecutableState.DISCARDED) {
            try {
                Thread.sleep(5000L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void cleanup() throws Exception {
        this.cleanupOldStorage();
        HBaseMetadataTestCase.staticCleanupTestMetadata();
    }

    protected void cleanupOldStorage() throws Exception {
        String[] args = new String[]{"--delete", "true"};
        StorageCleanupJob cli = new StorageCleanupJob();
        cli.execute(args);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Exception {
        long start = System.currentTimeMillis();
        int exitCode = 0;
        BuildCubeWithStream buildCubeWithStream = null;
        try {
            BuildCubeWithStream.beforeClass();
            buildCubeWithStream = new BuildCubeWithStream();
            buildCubeWithStream.before();
            buildCubeWithStream.build();
            logger.info("Build is done");
        }
        catch (Throwable e) {
            logger.error("error", e);
            exitCode = 1;
        }
        finally {
            if (buildCubeWithStream != null) {
                buildCubeWithStream.after();
                buildCubeWithStream.cleanup();
            }
            logger.info("Going to exit");
        }
        long millis = System.currentTimeMillis() - start;
        System.out.println("Time elapsed: " + millis / 1000L + " sec - in " + BuildCubeWithStream.class.getName());
        System.exit(exitCode);
    }

    class StreamSourcePartitionCallable
    implements Callable<ExecutableState> {
        private final String cubeName;
        private final SourcePartition sourcePartition;

        public StreamSourcePartitionCallable(String cubeName, SourcePartition sourcePartition) {
            this.cubeName = cubeName;
            this.sourcePartition = sourcePartition;
        }

        @Override
        public ExecutableState call() throws Exception {
            ExecutableState result = null;
            try {
                result = BuildCubeWithStream.this.buildSegment(this.cubeName, this.sourcePartition);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            return result;
        }
    }

    class StreamOffsetCallable
    implements Callable<ExecutableState> {
        private final String cubeName;
        private final long startOffset;
        private final long endOffset;

        public StreamOffsetCallable(String cubeName, long startOffset, long endOffset) {
            this.cubeName = cubeName;
            this.startOffset = startOffset;
            this.endOffset = endOffset;
        }

        @Override
        public ExecutableState call() throws Exception {
            ExecutableState result = null;
            try {
                result = BuildCubeWithStream.this.buildSegment(this.cubeName, this.startOffset, this.endOffset);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            return result;
        }
    }
}

