package org.apache.kylin.realtime;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.common.util.ZKUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.job.DeployUtil;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock;
import org.apache.kylin.job.streaming.Kafka10DataLoader;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.provision.MockKafka;
import org.apache.kylin.query.KylinTestBase;
import org.apache.kylin.rest.job.StorageCleanupJob;
import org.apache.kylin.stream.coordinator.Coordinator;
import org.apache.kylin.stream.core.client.ReceiverAdminClient;
import org.apache.kylin.stream.core.consumer.ConsumerStartMode;
import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol;
import org.apache.kylin.stream.core.model.AssignRequest;
import org.apache.kylin.stream.core.model.ConsumerStatsResponse;
import org.apache.kylin.stream.core.model.HealthCheckInfo;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.model.PauseConsumersRequest;
import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.model.ResumeConsumerRequest;
import org.apache.kylin.stream.core.model.StartConsumersRequest;
import org.apache.kylin.stream.core.model.StopConsumersRequest;
import org.apache.kylin.stream.core.model.UnAssignRequest;
import org.apache.kylin.stream.core.model.stats.ReceiverCubeStats;
import org.apache.kylin.stream.core.model.stats.ReceiverStats;
import org.apache.kylin.stream.core.source.StreamingSourceConfig;
import org.apache.kylin.stream.core.source.StreamingSourceConfigManager;
import org.apache.kylin.stream.server.StreamingServer;
import org.apache.kylin.stream.source.kafka.KafkaSource;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/realtime/BuildCubeWithStreamV2.class */
public class BuildCubeWithStreamV2 extends KylinTestBase {
    private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStreamV2.class);
    private static final String CUBE_NAME = "test_streaming_v2_user_info_cube";
    private String topicName;
    private StreamingServer streamingServer;
    private DefaultScheduler scheduler;
    private MockKafka kafkaServer;
    private KafkaProducer<byte[], byte[]> producer;
    private int replicaSetID;
    private final String kafkaZkPath = ZKUtil.getZkRootBasedPath("streamingv2") + "/" + RandomUtil.randomUUID().toString();
    private final String messageFile = "src/test/resources/streaming_v2_user_info_messages.txt";
    private volatile boolean generateDataDone = false;
    private Coordinator coordinator = Coordinator.getInstance();

    /* loaded from: input_file:org/apache/kylin/realtime/BuildCubeWithStreamV2$MockedReceiverAdminClient.class */
    static class MockedReceiverAdminClient implements ReceiverAdminClient {
        private StreamingServer streamingServer;

        public MockedReceiverAdminClient(StreamingServer streamingServer) {
            this.streamingServer = streamingServer;
        }

        public void assign(Node node, AssignRequest assignRequest) throws IOException {
            this.streamingServer.assign(assignRequest.getCubeName(), assignRequest.getPartitions());
            if (assignRequest.isStartConsumers()) {
                this.streamingServer.startConsumer(assignRequest.getCubeName(), new ConsumerStartProtocol(ConsumerStartMode.LATEST));
            }
        }

        public void unAssign(Node node, UnAssignRequest unAssignRequest) throws IOException {
            this.streamingServer.unAssign(unAssignRequest.getCube());
        }

        public void startConsumers(Node node, StartConsumersRequest startConsumersRequest) throws IOException {
        }

        public ConsumerStatsResponse stopConsumers(Node node, StopConsumersRequest stopConsumersRequest) throws IOException {
            return null;
        }

        public ConsumerStatsResponse pauseConsumers(Node node, PauseConsumersRequest pauseConsumersRequest) throws IOException {
            return null;
        }

        public ConsumerStatsResponse resumeConsumers(Node node, ResumeConsumerRequest resumeConsumerRequest) throws IOException {
            return null;
        }

        public void removeCubeSegment(Node node, String str, String str2) throws IOException {
        }

        public void makeCubeImmutable(Node node, String str) throws IOException {
        }

        public void segmentBuildComplete(Node node, String str, String str2) throws IOException {
            this.streamingServer.remoteSegmentBuildComplete(str, str2);
        }

        public void addToReplicaSet(Node node, int i) throws IOException {
            this.streamingServer.addToReplicaSet(i);
        }

        public void removeFromReplicaSet(Node node) throws IOException {
        }

        public ReceiverStats getReceiverStats(Node node) throws IOException {
            return null;
        }

        public ReceiverCubeStats getReceiverCubeStats(Node node, String str) throws IOException {
            return null;
        }

        public HealthCheckInfo healthCheck(Node node) throws IOException {
            return null;
        }
    }

    public BuildCubeWithStreamV2() {
        this.coordinator.setToLeader();
        this.streamingServer = StreamingServer.getInstance();
        this.streamingServer.setCoordinatorClient(this.coordinator);
        this.coordinator.setReceiverAdminClient(new MockedReceiverAdminClient(this.streamingServer));
    }

    public static void beforeClass() throws Exception {
        beforeClass(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
    }

    public static void beforeClass(String str) throws Exception {
        logger.info("Adding to classpath: " + new File(str).getAbsolutePath());
        ClassUtil.addClasspath(new File(str).getAbsolutePath());
        System.setProperty("KYLIN_CONF", str);
        System.setProperty("kylin.hadoop.conf.dir", str);
        if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
            throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.4.0.0-169");
        }
        System.setProperty("kylin.stream.stand-alone.mode", "true");
        setupAll();
        try {
            if (!HadoopUtil.getWorkingFileSystem().mkdirs(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()))) {
                throw new IOException("mkdir fails, please check hdfs permission");
            }
            cleanStreamZkRoot();
        } catch (IOException e) {
            throw new RuntimeException("failed to create kylin.env.hdfs-working-dir, Please make sure the user has right to access " + KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory(), e);
        }
    }

    public static void afterClass() {
        HBaseMetadataTestCase.staticCleanupTestMetadata();
    }

    public void before() throws Exception {
        deployEnv();
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        this.scheduler = DefaultScheduler.createInstance();
        this.scheduler.init(new JobEngineConfig(instanceFromEnv), new ZookeeperJobLock());
        if (!this.scheduler.hasStarted()) {
            throw new RuntimeException("scheduler has not been started");
        }
        CubeInstance cube = CubeManager.getInstance(instanceFromEnv).getCube(CUBE_NAME);
        StreamingSourceConfig config = StreamingSourceConfigManager.getInstance(instanceFromEnv).getConfig(cube.getRootFactTable(), cube.getProject());
        this.topicName = KafkaSource.getTopicName(config.getProperties());
        String bootstrapServers = KafkaSource.getBootstrapServers(config.getProperties());
        startEmbeddedKafka(this.topicName, bootstrapServers.split(",")[0], 0);
        this.streamingServer.start();
        ReplicaSet replicaSet = new ReplicaSet();
        replicaSet.addNode(new Node());
        this.coordinator.createReplicaSet(replicaSet);
        this.replicaSetID = replicaSet.getReplicaSetID();
        this.streamingServer.becomeLeader();
        CubeManager.getInstance(instanceFromEnv).updateCubeStatus(cube, RealizationStatusEnum.READY);
        this.coordinator.assignCube(CUBE_NAME);
        Properties properties = new Properties();
        properties.put("key.serializer", ByteArraySerializer.class.getName());
        properties.put("value.serializer", ByteArraySerializer.class.getName());
        this.producer = new KafkaProducer<>(Kafka10DataLoader.constructDefaultKafkaProducerProperties(bootstrapServers, properties));
    }

    public void produce(final int i) throws IOException {
        new Thread(new Runnable() { // from class: org.apache.kylin.realtime.BuildCubeWithStreamV2.1
            @Override // java.lang.Runnable
            public void run() {
                BufferedReader bufferedReader = null;
                try {
                    BuildCubeWithStreamV2.logger.info("Generate data start");
                    bufferedReader = Files.newBufferedReader(Paths.get("src/test/resources/streaming_v2_user_info_messages.txt", new String[0]));
                    int i2 = 0;
                    while (true) {
                        String readLine = bufferedReader.readLine();
                        if (readLine == null || i2 >= i) {
                            break;
                        }
                        BuildCubeWithStreamV2.this.producer.send(new ProducerRecord(BuildCubeWithStreamV2.this.topicName, String.valueOf(i2).getBytes(StandardCharsets.UTF_8), readLine.getBytes(StandardCharsets.UTF_8)));
                        i2++;
                        Thread.sleep(50L);
                    }
                    BuildCubeWithStreamV2.this.generateDataDone = true;
                    BuildCubeWithStreamV2.logger.info("Generate data done");
                    IOUtils.closeQuietly(bufferedReader);
                    BuildCubeWithStreamV2.this.producer.close();
                } catch (Exception e) {
                    IOUtils.closeQuietly(bufferedReader);
                    BuildCubeWithStreamV2.this.producer.close();
                } catch (Throwable th) {
                    IOUtils.closeQuietly(bufferedReader);
                    BuildCubeWithStreamV2.this.producer.close();
                    throw th;
                }
            }
        }).start();
    }

    public void beforeQuery() throws Exception {
        boolean isConsumeDataDone;
        boolean isSegmentBuildSuccess;
        Thread.sleep(500000L);
        for (int i = 0; !this.generateDataDone && i < 2; i++) {
            Thread.sleep(60000L);
        }
        if (!this.generateDataDone) {
            throw new IllegalStateException("Timeout when wait all messages be sent to Kafka");
        }
        int i2 = 0;
        while (true) {
            isConsumeDataDone = isConsumeDataDone();
            if (isConsumeDataDone || i2 >= 5) {
                break;
            }
            Thread.sleep(60000L);
            i2++;
        }
        if (!isConsumeDataDone) {
            throw new IllegalStateException("Exec timeout, data not be consumed completely");
        }
        int i3 = 0;
        while (true) {
            isSegmentBuildSuccess = isSegmentBuildSuccess();
            if (isSegmentBuildSuccess || i3 >= 5) {
                break;
            }
            Thread.sleep(120000L);
            i3++;
        }
        if (!isSegmentBuildSuccess) {
            throw new IllegalStateException("Build failed/timeout, no ready segment");
        }
    }

    public void query() throws Exception {
        if (!(execAndCompSuccess("src/test/resources/query/sql_streaming_v2/compare_result", null, true) && execSuccess("src/test/resources/query/sql_streaming_v2/not_compare_result"))) {
            throw new IllegalStateException("Build failed/timeout, no ready segment");
        }
    }

    public void after() throws Exception {
        this.coordinator.unAssignCube(CUBE_NAME);
        this.streamingServer.removeFromReplicaSet();
        if (this.kafkaServer != null) {
            this.kafkaServer.stop();
        }
        ZKUtil.cleanZkPath(this.kafkaZkPath);
        DefaultScheduler.destroyInstance();
    }

    public void cleanup() throws Exception {
        cleanupOldStorage();
        HBaseMetadataTestCase.staticCleanupTestMetadata();
    }

    private boolean execAndCompSuccess(String str, String[] strArr, boolean z) throws Exception {
        try {
            execAndCompQuery(str, strArr, z);
            return true;
        } catch (Exception e) {
            logger.error("Exec query and compare result failed", e);
            return false;
        }
    }

    private boolean execSuccess(String str) throws Exception {
        try {
            batchExecuteQuery(str);
            return true;
        } catch (Exception e) {
            logger.error("Exec query failed", e);
            return false;
        }
    }

    private boolean isConsumeDataDone() throws Exception {
        return execAndCompSuccess("src/test/resources/query/sql_streaming_v2/count", null, false);
    }

    private boolean isSegmentBuildSuccess() {
        return CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(CUBE_NAME).getLatestReadySegment() != null;
    }

    public void cleanupOldStorage() throws Exception {
        new StorageCleanupJob().execute(new String[]{"--delete", "true"});
    }

    private void startEmbeddedKafka(String str, String str2, int i) {
        this.kafkaServer = new MockKafka(new ZkConnection(ZKUtil.getZKConnectString(KylinConfig.getInstanceFromEnv()) + this.kafkaZkPath), str2, i);
        this.kafkaServer.start();
        this.kafkaServer.createTopic(str, 3, 1);
        this.kafkaServer.waitTopicUntilReady(str);
        Assert.assertEquals(str, this.kafkaServer.fetchTopicMeta(str).topic());
    }

    private void deployEnv() throws Exception {
        DeployUtil.overrideJobJarLocations();
    }

    public static void cleanStreamZkRoot() {
        ZKUtil.cleanZkPath("/stream");
    }

    public static void main(String[] strArr) {
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        BuildCubeWithStreamV2 buildCubeWithStreamV2 = null;
        try {
            try {
                beforeClass();
                buildCubeWithStreamV2 = new BuildCubeWithStreamV2();
                buildCubeWithStreamV2.before();
                buildCubeWithStreamV2.produce(10000);
                buildCubeWithStreamV2.beforeQuery();
                buildCubeWithStreamV2.query();
                if (buildCubeWithStreamV2 != null) {
                    try {
                        buildCubeWithStreamV2.after();
                        buildCubeWithStreamV2.cleanup();
                    } catch (Exception e) {
                    }
                }
                logger.info("Going to exit");
            } catch (Throwable th) {
                logger.error("error", th);
                i = 1;
                if (buildCubeWithStreamV2 != null) {
                    try {
                        buildCubeWithStreamV2.after();
                        buildCubeWithStreamV2.cleanup();
                    } catch (Exception e2) {
                        logger.info("Going to exit");
                        System.out.println("Time elapsed: " + ((System.currentTimeMillis() - currentTimeMillis) / 1000) + " sec - in " + BuildCubeWithStreamV2.class.getName());
                        System.exit(i);
                    }
                }
                logger.info("Going to exit");
            }
            System.out.println("Time elapsed: " + ((System.currentTimeMillis() - currentTimeMillis) / 1000) + " sec - in " + BuildCubeWithStreamV2.class.getName());
            System.exit(i);
        } catch (Throwable th2) {
            if (buildCubeWithStreamV2 != null) {
                try {
                    buildCubeWithStreamV2.after();
                    buildCubeWithStreamV2.cleanup();
                } catch (Exception e3) {
                    logger.info("Going to exit");
                    throw th2;
                }
            }
            logger.info("Going to exit");
            throw th2;
        }
    }
}
