package org.apache.kylin.streaming.metadata;

import java.util.Map;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.streaming.constants.StreamingConstants;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/kylin/streaming/metadata/StreamingJobMetaTest.class */
public class StreamingJobMetaTest extends NLocalFileMetadataTestCase {
    private static String PROJECT = "streaming_test";
    private static String MODEL_ID = "e78a89dd-847f-4574-8afa-8768b4228b73";

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        cleanupTestMetadata();
    }

    @Test
    public void testCreateBuildJob() {
        NDataModel dataModelDesc = NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT).getDataModelDesc(MODEL_ID);
        JobStatusEnum jobStatusEnum = JobStatusEnum.NEW;
        JobTypeEnum jobTypeEnum = JobTypeEnum.STREAMING_BUILD;
        StreamingJobMeta create = StreamingJobMeta.create(dataModelDesc, jobStatusEnum, JobTypeEnum.STREAMING_BUILD);
        Assert.assertNotNull(create);
        Map<String, String> params = create.getParams();
        assertJobMeta(dataModelDesc, create, jobStatusEnum, jobTypeEnum);
        assertCommonParams(params);
        Assert.assertEquals("30", params.get("kylin.streaming.duration"));
        Assert.assertEquals(StreamingConstants.STREAMING_MAX_OFFSETS_PER_TRIGGER_DEFAULT, params.get("kylin.streaming.kafka-conf.maxOffsetsPerTrigger"));
    }

    @Test
    public void testCreateMergeJob() {
        NDataModel dataModelDesc = NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT).getDataModelDesc(MODEL_ID);
        JobStatusEnum jobStatusEnum = JobStatusEnum.NEW;
        JobTypeEnum jobTypeEnum = JobTypeEnum.STREAMING_MERGE;
        StreamingJobMeta create = StreamingJobMeta.create(dataModelDesc, jobStatusEnum, jobTypeEnum);
        Assert.assertNotNull(create);
        Map<String, String> params = create.getParams();
        assertJobMeta(dataModelDesc, create, jobStatusEnum, jobTypeEnum);
        assertCommonParams(params);
        Assert.assertEquals("32m", params.get("kylin.streaming.segment-max-size"));
        Assert.assertEquals("3", params.get("kylin.streaming.segment-merge-threshold"));
    }

    private void assertJobMeta(NDataModel nDataModel, StreamingJobMeta streamingJobMeta, JobStatusEnum jobStatusEnum, JobTypeEnum jobTypeEnum) {
        Assert.assertNotNull(Long.valueOf(streamingJobMeta.getCreateTime()));
        Assert.assertNotNull(streamingJobMeta.getLastUpdateTime());
        Assert.assertEquals(jobStatusEnum, streamingJobMeta.getCurrentStatus());
        Assert.assertEquals(nDataModel.getUuid(), streamingJobMeta.getModelId());
        Assert.assertEquals(nDataModel.getAlias(), streamingJobMeta.getModelName());
        Assert.assertEquals(nDataModel.getRootFactTableName(), streamingJobMeta.getFactTableName());
        Assert.assertEquals(nDataModel.getRootFactTable().getTableDesc().getKafkaConfig().getSubscribe(), streamingJobMeta.getTopicName());
        Assert.assertEquals(nDataModel.getOwner(), streamingJobMeta.getOwner());
        Assert.assertEquals(StreamingUtils.getJobId(nDataModel.getUuid(), jobTypeEnum.name()), streamingJobMeta.getUuid());
    }

    private void assertCommonParams(Map<String, String> map) {
        Assert.assertTrue(!map.isEmpty());
        Assert.assertEquals("yarn", map.get("spark.master"));
        Assert.assertEquals("512m", map.get("spark.driver.memory"));
        Assert.assertEquals("2", map.get("spark.executor.instances"));
        Assert.assertEquals("2", map.get("spark.executor.cores"));
        Assert.assertEquals("1g", map.get("spark.executor.memory"));
        Assert.assertEquals("8", map.get("spark.sql.shuffle.partitions"));
        Assert.assertEquals("false", map.get("kylin.streaming.job-retry-enabled"));
    }
}
