package org.apache.kylin.streaming.jobs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StreamingTestConstant;
import org.apache.kylin.common.response.RestResponse;
import org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.metadata.cube.cuboid.NSpanningTree;
import org.apache.kylin.metadata.cube.cuboid.NSpanningTreeFactory;
import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.source.kafka.NSparkKafkaSource;
import org.apache.kylin.streaming.CreateStreamingFlatTable;
import org.apache.kylin.streaming.app.StreamingEntry;
import org.apache.kylin.streaming.common.BuildJobEntry;
import org.apache.kylin.streaming.rest.RestSupport;
import org.apache.kylin.streaming.util.ReflectionUtils;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import scala.Tuple3;

/* loaded from: input_file:org/apache/kylin/streaming/jobs/StreamingDFBuildJobTest.class */
public class StreamingDFBuildJobTest extends StreamingTestCase {
    private static final String PROJECT = "streaming_test";
    private static final String DATAFLOW_ID = "4965c827-fbb4-4ea1-a744-3f341a3b030d";

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        cleanupTestMetadata();
    }

    @Test
    public void testStreamingBuild() {
        KylinConfig testConfig = getTestConfig();
        KylinBuildEnv.getOrCreate(testConfig);
        NSparkKafkaSource createSparkKafkaSource = createSparkKafkaSource(testConfig);
        createSparkKafkaSource.enableMemoryStream(false);
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(testConfig, PROJECT);
        NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
        nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflow.getSegments().toArray(new NDataSegment[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate);
        NDataflow dataflow2 = nDataflowManager.getDataflow(dataflow.getId());
        NSpanningTree fromLayouts = NSpanningTreeFactory.fromLayouts(StreamingUtils.getToBuildLayouts(dataflow2), DATAFLOW_ID);
        NDataModel model = dataflow2.getModel();
        StreamingDFBuildJob streamingDFBuildJob = (StreamingDFBuildJob) Mockito.spy(new StreamingDFBuildJob(PROJECT));
        StreamingEntry streamingEntry = new StreamingEntry();
        streamingEntry.parseParams(new String[]{PROJECT, DATAFLOW_ID, "3000", "", "xx"});
        SparkSession createSparkSession = createSparkSession();
        streamingEntry.setSparkSession(createSparkSession);
        Tuple3 generateStreamQueryForOneModel = streamingEntry.generateStreamQueryForOneModel();
        Dataset dataset = (Dataset) generateStreamQueryForOneModel._1();
        CreateStreamingFlatTable createStreamingFlatTable = (CreateStreamingFlatTable) generateStreamQueryForOneModel._3();
        NDataSegment appendSegmentForStreaming = nDataflowManager.appendSegmentForStreaming(dataflow2, new SegmentRange.KafkaOffsetPartitionedSegmentRange(0L, 10L, createKafkaPartitionsOffset(3, 100L), createKafkaPartitionsOffset(3, 200L)));
        appendSegmentForStreaming.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate2 = new NDataflowUpdate(dataflow2.getUuid());
        nDataflowUpdate2.setToUpdateSegs(new NDataSegment[]{appendSegmentForStreaming});
        ArrayList newArrayList = Lists.newArrayList();
        NIndexPlanManager.getInstance(getTestConfig(), PROJECT).getIndexPlan(DATAFLOW_ID).getAllLayouts().forEach(layoutEntity -> {
            newArrayList.add(NDataLayout.newDataLayout(dataflow2, appendSegmentForStreaming.getId(), layoutEntity.getId()));
        });
        nDataflowUpdate2.setToAddOrUpdateLayouts((NDataLayout[]) newArrayList.toArray(new NDataLayout[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate2);
        createStreamingFlatTable.seg_$eq(appendSegmentForStreaming);
        final BuildJobEntry buildJobEntry = new BuildJobEntry(createSparkSession, PROJECT, DATAFLOW_ID, 100L, appendSegmentForStreaming, createStreamingFlatTable.encodeStreamingDataset(appendSegmentForStreaming, model, dataset), fromLayouts);
        try {
            final NDataflowManager nDataflowManager2 = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT);
            NDataflow dataflow3 = nDataflowManager2.getDataflow(buildJobEntry.dataflowId());
            Assert.assertEquals(RealizationStatusEnum.OFFLINE, dataflow3.getStatus());
            Assert.assertEquals(4L, dataflow3.getSegment(appendSegmentForStreaming.getId()).getLayoutsMap().size());
            long storageFileCount = dataflow3.getSegment(appendSegmentForStreaming.getId()).getStorageFileCount();
            long storageBytesSize = dataflow3.getSegment(appendSegmentForStreaming.getId()).getStorageBytesSize();
            streamingDFBuildJob.streamBuild(buildJobEntry);
            NDataflow dataflow4 = nDataflowManager2.getDataflow(buildJobEntry.dataflowId());
            Assert.assertEquals(RealizationStatusEnum.ONLINE, dataflow4.getStatus());
            Assert.assertEquals(4L, dataflow4.getSegment(appendSegmentForStreaming.getId()).getLayoutsMap().size());
            Assert.assertTrue(dataflow4.getSegment(appendSegmentForStreaming.getId()).getStorageFileCount() > storageFileCount);
            Assert.assertTrue(dataflow4.getSegment(appendSegmentForStreaming.getId()).getStorageBytesSize() > storageBytesSize);
            nDataflowManager2.updateDataflow(buildJobEntry.dataflowId(), nDataflow -> {
                nDataflow.setStatus(RealizationStatusEnum.OFFLINE);
            });
            Mockito.when(streamingDFBuildJob.createRestSupport()).thenReturn(new RestSupport(testConfig) { // from class: org.apache.kylin.streaming.jobs.StreamingDFBuildJobTest.1
                public RestResponse execute(HttpRequestBase httpRequestBase, Object obj) {
                    nDataflowManager2.updateDataflow(buildJobEntry.dataflowId(), nDataflow2 -> {
                        nDataflow2.setStatus(RealizationStatusEnum.ONLINE);
                    });
                    return RestResponse.ok();
                }
            });
            streamingDFBuildJob.updateSegment(buildJobEntry);
            Assert.assertEquals(RealizationStatusEnum.ONLINE, nDataflowManager2.getDataflow(buildJobEntry.dataflowId()).getStatus());
        } catch (Exception e) {
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testGetSegment() {
        KylinConfig testConfig = getTestConfig();
        NSparkKafkaSource createSparkKafkaSource = createSparkKafkaSource(testConfig);
        createSparkKafkaSource.enableMemoryStream(false);
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        NDataflowManager.getInstance(testConfig, PROJECT).getDataflow("e78a89dd-847f-4574-8afa-8768b4228b73");
        StreamingDFBuildJob streamingDFBuildJob = new StreamingDFBuildJob(PROJECT);
        streamingDFBuildJob.setParam("dataflowId", "e78a89dd-847f-4574-8afa-8768b4228b73");
        NDataSegment segment = streamingDFBuildJob.getSegment("c380dd2a-43b8-4268-b73d-2a5f76236632");
        Assert.assertNotNull(segment);
        Assert.assertEquals("c380dd2a-43b8-4268-b73d-2a5f76236632", segment.getId());
    }

    @Test
    public void testShutdown() {
        StreamingDFBuildJob streamingDFBuildJob = new StreamingDFBuildJob(PROJECT);
        streamingDFBuildJob.shutdown();
        try {
            ((BuildLayoutWithUpdate) ReflectionUtils.getField(streamingDFBuildJob, "buildLayoutWithUpdate")).submit(new BuildLayoutWithUpdate.JobEntity() { // from class: org.apache.kylin.streaming.jobs.StreamingDFBuildJobTest.2
                public long getIndexId() {
                    return 0L;
                }

                public String getName() {
                    return null;
                }

                public List<NDataLayout> build() throws IOException {
                    return null;
                }
            }, getTestConfig());
        } catch (Exception e) {
            Assert.assertTrue(e instanceof RejectedExecutionException);
        }
    }
}
