package org.apache.kylin.streaming.jobs;

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StreamingTestConstant;
import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
import org.apache.kylin.metadata.cube.model.NCubeJoinedFlatTableDesc;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.source.kafka.NSparkKafkaSource;
import org.apache.kylin.streaming.CreateStreamingFlatTable;
import org.apache.kylin.streaming.app.StreamingEntry;
import org.apache.kylin.streaming.common.CreateFlatTableEntry;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.apache.spark.sql.Dataset;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kylin/streaming/jobs/CreateStreamingFlatTableTest.class */
public class CreateStreamingFlatTableTest extends StreamingTestCase {

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private static String PROJECT = "streaming_test";
    private static String DATAFLOW_ID = "511a9163-7888-4a60-aa24-ae735937cc87";

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        cleanupTestMetadata();
    }

    @Test
    public void testGenerateStreamingDataset() {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        instanceFromEnv.setProperty("kylin.streaming.kafka-conf.maxOffsetsPerTrigger", "100");
        instanceFromEnv.setProperty("kylin.streaming.kafka-conf.security.protocol", "SASL_PLAINTEXT");
        instanceFromEnv.setProperty("kylin.streaming.kafka-conf.sasl.mechanism", "PLAIN");
        NSparkKafkaSource createSparkKafkaSource = createSparkKafkaSource(instanceFromEnv);
        createSparkKafkaSource.enableMemoryStream(false);
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        String[] strArr = {PROJECT, DATAFLOW_ID, "5", "10 seconds"};
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.setSparkSession(createSparkSession());
        NDataflow dataflow = NDataflowManager.getInstance(instanceFromEnv, PROJECT).getDataflow(DATAFLOW_ID);
        NCubeJoinedFlatTableDesc nCubeJoinedFlatTableDesc = new NCubeJoinedFlatTableDesc(dataflow.getIndexPlan());
        NDataSegment empty = NDataSegment.empty();
        empty.setId("test-1234");
        CreateStreamingFlatTable apply = CreateStreamingFlatTable.apply(new CreateFlatTableEntry(nCubeJoinedFlatTableDesc, empty, streamingEntry.createSpanningTree(dataflow), streamingEntry.getSparkSession(), (NBuildSourceInfo) null, "LO_PARTITIONCOLUMN", (String) null, "org.apache.kylin.parser.TimedJsonStreamParser"));
        Dataset generateStreamingDataset = apply.generateStreamingDataset(instanceFromEnv);
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        Assert.assertEquals(4L, apply.lookupTablesGlobal().size());
        Assert.assertEquals(-1L, apply.tableRefreshInterval());
        Assert.assertFalse(apply.shouldRefreshTable());
        Assert.assertEquals(DATAFLOW_ID, apply.model().getId());
        Assert.assertNotNull(generateStreamingDataset);
        Assert.assertEquals(60L, generateStreamingDataset.encoder().schema().size());
    }
}
