/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.realtime;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Properties;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.common.util.ZKUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.job.DeployUtil;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.lock.JobLock;
import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock;
import org.apache.kylin.job.streaming.Kafka10DataLoader;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.provision.MockKafka;
import org.apache.kylin.query.KylinTestBase;
import org.apache.kylin.rest.job.StorageCleanupJob;
import org.apache.kylin.stream.coordinator.Coordinator;
import org.apache.kylin.stream.coordinator.client.CoordinatorClient;
import org.apache.kylin.stream.core.client.ReceiverAdminClient;
import org.apache.kylin.stream.core.consumer.ConsumerStartMode;
import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol;
import org.apache.kylin.stream.core.model.AssignRequest;
import org.apache.kylin.stream.core.model.ConsumerStatsResponse;
import org.apache.kylin.stream.core.model.HealthCheckInfo;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.model.PauseConsumersRequest;
import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.model.ResumeConsumerRequest;
import org.apache.kylin.stream.core.model.StartConsumersRequest;
import org.apache.kylin.stream.core.model.StopConsumersRequest;
import org.apache.kylin.stream.core.model.UnAssignRequest;
import org.apache.kylin.stream.core.model.stats.ReceiverCubeStats;
import org.apache.kylin.stream.core.model.stats.ReceiverStats;
import org.apache.kylin.stream.core.source.StreamingSourceConfig;
import org.apache.kylin.stream.core.source.StreamingSourceConfigManager;
import org.apache.kylin.stream.server.StreamingServer;
import org.apache.kylin.stream.source.kafka.KafkaSource;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BuildCubeWithStreamV2
extends KylinTestBase {
    private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStreamV2.class);
    private static final String CUBE_NAME = "test_streaming_v2_user_info_cube";
    private final String kafkaZkPath = ZKUtil.getZkRootBasedPath((String)"streamingv2") + "/" + RandomUtil.randomUUID().toString();
    private final String messageFile = "src/test/resources/streaming_v2_user_info_messages.txt";
    private String topicName;
    private Coordinator coordinator = Coordinator.getInstance();
    private StreamingServer streamingServer;
    private DefaultScheduler scheduler;
    private MockKafka kafkaServer;
    private KafkaProducer<byte[], byte[]> producer;
    private int replicaSetID;
    private volatile boolean generateDataDone = false;

    public BuildCubeWithStreamV2() {
        this.coordinator.setToLeader();
        this.streamingServer = StreamingServer.getInstance();
        this.streamingServer.setCoordinatorClient((CoordinatorClient)this.coordinator);
        this.coordinator.setReceiverAdminClient((ReceiverAdminClient)new MockedReceiverAdminClient(this.streamingServer));
    }

    public static void beforeClass() throws Exception {
        BuildCubeWithStreamV2.beforeClass(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
    }

    public static void beforeClass(String confDir) throws Exception {
        logger.info("Adding to classpath: " + new File(confDir).getAbsolutePath());
        ClassUtil.addClasspath((String)new File(confDir).getAbsolutePath());
        System.setProperty("KYLIN_CONF", confDir);
        System.setProperty("kylin.hadoop.conf.dir", confDir);
        if (StringUtils.isEmpty((CharSequence)System.getProperty("hdp.version"))) {
            throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.4.0.0-169");
        }
        System.setProperty("kylin.stream.stand-alone.mode", "true");
        BuildCubeWithStreamV2.setupAll();
        try {
            FileSystem fileSystem = HadoopUtil.getWorkingFileSystem();
            String hdfsWorkingDirectory = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
            Path coprocessorDir = new Path(hdfsWorkingDirectory);
            boolean success = fileSystem.mkdirs(coprocessorDir);
            if (!success) {
                throw new IOException("mkdir fails, please check hdfs permission");
            }
        }
        catch (IOException e) {
            throw new RuntimeException("failed to create kylin.env.hdfs-working-dir, Please make sure the user has right to access " + KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory(), e);
        }
        BuildCubeWithStreamV2.cleanStreamZkRoot();
    }

    public static void afterClass() {
        HBaseMetadataTestCase.staticCleanupTestMetadata();
    }

    public void before() throws Exception {
        this.deployEnv();
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        this.scheduler = DefaultScheduler.createInstance();
        this.scheduler.init(new JobEngineConfig(kylinConfig), (JobLock)new ZookeeperJobLock());
        if (!this.scheduler.hasStarted()) {
            throw new RuntimeException("scheduler has not been started");
        }
        CubeInstance cubeInstance = CubeManager.getInstance((KylinConfig)kylinConfig).getCube(CUBE_NAME);
        String streamingTableName = cubeInstance.getRootFactTable();
        StreamingSourceConfig sourceConfig = StreamingSourceConfigManager.getInstance((KylinConfig)kylinConfig).getConfig(streamingTableName);
        this.topicName = KafkaSource.getTopicName((Map)sourceConfig.getProperties());
        String bootstrapServers = KafkaSource.getBootstrapServers((Map)sourceConfig.getProperties());
        String server = bootstrapServers.split(",")[0];
        int brokerId = 0;
        this.startEmbeddedKafka(this.topicName, server, brokerId);
        this.streamingServer.start();
        ReplicaSet replicaSet = new ReplicaSet();
        replicaSet.addNode(new Node());
        this.coordinator.createReplicaSet(replicaSet);
        this.replicaSetID = replicaSet.getReplicaSetID();
        this.streamingServer.becomeLeader();
        CubeManager.getInstance((KylinConfig)kylinConfig).updateCubeStatus(cubeInstance, RealizationStatusEnum.READY);
        this.coordinator.assignCube(CUBE_NAME);
        Properties properties = new Properties();
        properties.put("key.serializer", ByteArraySerializer.class.getName());
        properties.put("value.serializer", ByteArraySerializer.class.getName());
        this.producer = new KafkaProducer(Kafka10DataLoader.constructDefaultKafkaProducerProperties((String)bootstrapServers, (Properties)properties));
    }

    public void produce(final int size) throws IOException {
        new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                BufferedReader messageReader = null;
                try {
                    String message;
                    logger.info("Generate data start");
                    messageReader = Files.newBufferedReader(Paths.get("src/test/resources/streaming_v2_user_info_messages.txt", new String[0]));
                    for (int index = 0; (message = messageReader.readLine()) != null && index < size; ++index) {
                        ProducerRecord record = new ProducerRecord(BuildCubeWithStreamV2.this.topicName, (Object)String.valueOf(index).getBytes(StandardCharsets.UTF_8), (Object)message.getBytes(StandardCharsets.UTF_8));
                        BuildCubeWithStreamV2.this.producer.send(record);
                        Thread.sleep(50L);
                    }
                    BuildCubeWithStreamV2.this.generateDataDone = true;
                    logger.info("Generate data done");
                }
                catch (Exception exception) {
                    IOUtils.closeQuietly(messageReader);
                    BuildCubeWithStreamV2.this.producer.close();
                }
                catch (Throwable throwable) {
                    IOUtils.closeQuietly(messageReader);
                    BuildCubeWithStreamV2.this.producer.close();
                    throw throwable;
                }
                IOUtils.closeQuietly((Reader)messageReader);
                BuildCubeWithStreamV2.this.producer.close();
            }
        }).start();
    }

    public void beforeQuery() throws Exception {
        int waittime;
        Thread.sleep(500000L);
        for (waittime = 0; !this.generateDataDone && waittime < 2; ++waittime) {
            Thread.sleep(60000L);
        }
        if (!this.generateDataDone) {
            throw new IllegalStateException("Timeout when wait all messages be sent to Kafka");
        }
        boolean consumeDataDone = false;
        for (waittime = 0; !(consumeDataDone = this.isConsumeDataDone()) && waittime < 5; ++waittime) {
            Thread.sleep(60000L);
        }
        if (!consumeDataDone) {
            throw new IllegalStateException("Exec timeout, data not be comsumed completely");
        }
        boolean segmentBuildSuccess = false;
        for (waittime = 0; !(segmentBuildSuccess = this.isSegmentBuildSuccess()) && waittime < 5; ++waittime) {
            Thread.sleep(120000L);
        }
        if (!segmentBuildSuccess) {
            throw new IllegalStateException("Build failed/timeout, no ready segment");
        }
    }

    public void query() throws Exception {
        boolean success;
        boolean bl = success = this.execAndCompSuccess("src/test/resources/query/sql_streaming_v2/compare_result", null, true) && this.execSuccess("src/test/resources/query/sql_streaming_v2/not_compare_result");
        if (!success) {
            throw new IllegalStateException("Build failed/timeout, no ready segment");
        }
    }

    public void after() throws Exception {
        this.coordinator.unAssignCube(CUBE_NAME);
        this.streamingServer.removeFromReplicaSet();
        if (this.kafkaServer != null) {
            this.kafkaServer.stop();
        }
        ZKUtil.cleanZkPath((String)this.kafkaZkPath);
        DefaultScheduler.destroyInstance();
    }

    public void cleanup() throws Exception {
        this.cleanupOldStorage();
        HBaseMetadataTestCase.staticCleanupTestMetadata();
    }

    private boolean execAndCompSuccess(String queryFolder, String[] exclusiveQuerys, boolean needSort) throws Exception {
        try {
            this.execAndCompQuery(queryFolder, exclusiveQuerys, needSort);
            return true;
        }
        catch (Exception e) {
            logger.error("Exec query and compare result failed", (Throwable)e);
            return false;
        }
    }

    private boolean execSuccess(String queryFolder) throws Exception {
        try {
            this.batchExecuteQuery(queryFolder);
            return true;
        }
        catch (Exception e) {
            logger.error("Exec query failed", (Throwable)e);
            return false;
        }
    }

    private boolean isConsumeDataDone() throws Exception {
        return this.execAndCompSuccess("src/test/resources/query/sql_streaming_v2/count", null, false);
    }

    private boolean isSegmentBuildSuccess() {
        CubeInstance cubeInstance = CubeManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv()).getCube(CUBE_NAME);
        return cubeInstance.getLatestReadySegment() != null;
    }

    public void cleanupOldStorage() throws Exception {
        String[] args = new String[]{"--delete", "true"};
        StorageCleanupJob cli = new StorageCleanupJob();
        cli.execute(args);
    }

    private void startEmbeddedKafka(String topicName, String server, int brokerId) {
        ZkConnection zkConnection = new ZkConnection(ZKUtil.getZKConnectString((KylinConfig)KylinConfig.getInstanceFromEnv()) + this.kafkaZkPath);
        this.kafkaServer = new MockKafka(zkConnection, server, brokerId);
        this.kafkaServer.start();
        this.kafkaServer.createTopic(topicName, 3, 1);
        this.kafkaServer.waitTopicUntilReady(topicName);
        MetadataResponse.TopicMetadata topicMetadata = this.kafkaServer.fetchTopicMeta(topicName);
        Assert.assertEquals((Object)topicName, (Object)topicMetadata.topic());
    }

    private void deployEnv() throws Exception {
        DeployUtil.overrideJobJarLocations();
    }

    public static void cleanStreamZkRoot() {
        ZKUtil.cleanZkPath((String)"/stream");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        int exitCode = 0;
        BuildCubeWithStreamV2 buildCubeWithStreamV2 = null;
        try {
            BuildCubeWithStreamV2.beforeClass();
            buildCubeWithStreamV2 = new BuildCubeWithStreamV2();
            buildCubeWithStreamV2.before();
            buildCubeWithStreamV2.produce(10000);
            buildCubeWithStreamV2.beforeQuery();
            buildCubeWithStreamV2.query();
        }
        catch (Throwable e) {
            logger.error("error", e);
            exitCode = 1;
        }
        finally {
            try {
                if (buildCubeWithStreamV2 != null) {
                    buildCubeWithStreamV2.after();
                    buildCubeWithStreamV2.cleanup();
                }
            }
            catch (Exception exception) {}
            logger.info("Going to exit");
        }
        long millis = System.currentTimeMillis() - start;
        System.out.println("Time elapsed: " + millis / 1000L + " sec - in " + BuildCubeWithStreamV2.class.getName());
        System.exit(exitCode);
    }

    static class MockedReceiverAdminClient
    implements ReceiverAdminClient {
        private StreamingServer streamingServer;

        public MockedReceiverAdminClient(StreamingServer streamingServer) {
            this.streamingServer = streamingServer;
        }

        public void assign(Node receiver, AssignRequest assignRequest) throws IOException {
            this.streamingServer.assign(assignRequest.getCubeName(), assignRequest.getPartitions());
            if (assignRequest.isStartConsumers()) {
                this.streamingServer.startConsumer(assignRequest.getCubeName(), new ConsumerStartProtocol(ConsumerStartMode.LATEST));
            }
        }

        public void unAssign(Node receiver, UnAssignRequest unAssignRequest) throws IOException {
            this.streamingServer.unAssign(unAssignRequest.getCube());
        }

        public void startConsumers(Node receiver, StartConsumersRequest startRequest) throws IOException {
        }

        public ConsumerStatsResponse stopConsumers(Node receiver, StopConsumersRequest stopRequest) throws IOException {
            return null;
        }

        public ConsumerStatsResponse pauseConsumers(Node receiver, PauseConsumersRequest request) throws IOException {
            return null;
        }

        public ConsumerStatsResponse resumeConsumers(Node receiver, ResumeConsumerRequest request) throws IOException {
            return null;
        }

        public void removeCubeSegment(Node receiver, String cubeName, String segmentName) throws IOException {
        }

        public void makeCubeImmutable(Node receiver, String cubeName) throws IOException {
        }

        public void segmentBuildComplete(Node receiver, String cubeName, String segmentName) throws IOException {
            this.streamingServer.remoteSegmentBuildComplete(cubeName, segmentName);
        }

        public void addToReplicaSet(Node receiver, int replicaSetID) throws IOException {
            this.streamingServer.addToReplicaSet(replicaSetID);
        }

        public void removeFromReplicaSet(Node receiver) throws IOException {
        }

        public ReceiverStats getReceiverStats(Node receiver) throws IOException {
            return null;
        }

        public ReceiverCubeStats getReceiverCubeStats(Node receiver, String cubeName) throws IOException {
            return null;
        }

        public HealthCheckInfo healthCheck(Node receiver) throws IOException {
            return null;
        }
    }
}

