/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.jobs;

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StreamingTestConstant;
import org.apache.kylin.metadata.cube.model.NCubeJoinedFlatTableDesc;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.source.kafka.NSparkKafkaSource;
import org.apache.kylin.streaming.CreateStreamingFlatTable;
import org.apache.kylin.streaming.app.StreamingEntry;
import org.apache.kylin.streaming.common.CreateFlatTableEntry;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.apache.spark.sql.Dataset;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;

public class CreateStreamingFlatTableTest
extends StreamingTestCase {
    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private static String PROJECT = "streaming_test";
    private static String DATAFLOW_ID = "511a9163-7888-4a60-aa24-ae735937cc87";

    @Before
    public void setUp() throws Exception {
        this.createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        this.cleanupTestMetadata();
    }

    @Test
    public void testGenerateStreamingDataset() {
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        config.setProperty("kylin.streaming.kafka-conf.maxOffsetsPerTrigger", "100");
        config.setProperty("kylin.streaming.kafka-conf.security.protocol", "SASL_PLAINTEXT");
        config.setProperty("kylin.streaming.kafka-conf.sasl.mechanism", "PLAIN");
        NSparkKafkaSource source = this.createSparkKafkaSource(config);
        source.enableMemoryStream(false);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        String[] args = new String[]{PROJECT, DATAFLOW_ID, "5", "10 seconds"};
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.setSparkSession(this.createSparkSession());
        NDataflowManager dfMgr = NDataflowManager.getInstance((KylinConfig)config, (String)PROJECT);
        NDataflow dataflow = dfMgr.getDataflow(DATAFLOW_ID);
        NCubeJoinedFlatTableDesc flatTableDesc = new NCubeJoinedFlatTableDesc(dataflow.getIndexPlan());
        NDataSegment seg = NDataSegment.empty();
        seg.setId("test-1234");
        CreateFlatTableEntry flatTableEntry = new CreateFlatTableEntry((IJoinedFlatTableDesc)flatTableDesc, seg, entry.createSpanningTree(dataflow), entry.getSparkSession(), null, "LO_PARTITIONCOLUMN", null, "org.apache.kylin.parser.TimedJsonStreamParser");
        CreateStreamingFlatTable steamingFlatTable = CreateStreamingFlatTable.apply((CreateFlatTableEntry)flatTableEntry);
        Dataset ds = steamingFlatTable.generateStreamingDataset(config);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        Assert.assertEquals((long)4L, (long)steamingFlatTable.lookupTablesGlobal().size());
        Assert.assertEquals((long)-1L, (long)steamingFlatTable.tableRefreshInterval());
        Assert.assertFalse((boolean)steamingFlatTable.shouldRefreshTable());
        Assert.assertEquals((Object)DATAFLOW_ID, (Object)steamingFlatTable.model().getId());
        Assert.assertNotNull((Object)ds);
        Assert.assertEquals((long)60L, (long)ds.encoder().schema().size());
    }
}

