package org.apache.kylin.streaming.jobs;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StreamingTestConstant;
import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.metadata.cube.cuboid.NSpanningTree;
import org.apache.kylin.metadata.cube.cuboid.NSpanningTreeFactory;
import org.apache.kylin.metadata.cube.model.NCubeJoinedFlatTableDesc;
import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.source.kafka.NSparkKafkaSource;
import org.apache.kylin.streaming.CreateStreamingFlatTable;
import org.apache.kylin.streaming.app.StreamingEntry;
import org.apache.kylin.streaming.common.CreateFlatTableEntry;
import org.apache.kylin.streaming.common.MergeJobEntry;
import org.apache.kylin.streaming.common.MicroBatchEntry;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.mutable.ArrayBuffer;

/* loaded from: input_file:org/apache/kylin/streaming/jobs/StreamingDFMergeJobTest.class */
public class StreamingDFMergeJobTest extends StreamingTestCase {
    private static final Logger logger = LoggerFactory.getLogger(StreamingDFMergeJobTest.class);
    private static String PROJECT = "streaming_test";
    private static String DATAFLOW_ID = "e78a89dd-847f-4574-8afa-8768b4228b73";

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        cleanupTestMetadata();
    }

    @Test
    public void testStreamingMergeSegment() {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        KylinBuildEnv.getOrCreate(instanceFromEnv);
        NSparkKafkaSource createSparkKafkaSource = createSparkKafkaSource(instanceFromEnv);
        createSparkKafkaSource.enableMemoryStream(false);
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(instanceFromEnv, PROJECT);
        NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
        nDataflowUpdate.setToRemoveSegsWithArray((NDataSegment[]) dataflow.getSegments().toArray(new NDataSegment[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate);
        NDataflow dataflow2 = nDataflowManager.getDataflow(dataflow.getId());
        Assert.assertEquals(0L, dataflow2.getSegments().size());
        NCubeJoinedFlatTableDesc nCubeJoinedFlatTableDesc = new NCubeJoinedFlatTableDesc(dataflow2.getIndexPlan());
        Set toBuildLayouts = StreamingUtils.getToBuildLayouts(dataflow2);
        Preconditions.checkState(CollectionUtils.isNotEmpty(toBuildLayouts), "layouts is empty", toBuildLayouts);
        NSpanningTree fromLayouts = NSpanningTreeFactory.fromLayouts(toBuildLayouts, DATAFLOW_ID);
        SparkSession orCreate = SparkSession.builder().master("local").appName("test").getOrCreate();
        CreateStreamingFlatTable createStreamingFlatTable = new CreateStreamingFlatTable(new CreateFlatTableEntry(nCubeJoinedFlatTableDesc, (NDataSegment) null, fromLayouts, orCreate, (NBuildSourceInfo) null, (String) null, (String) null, "org.apache.kylin.parser.TimedJsonStreamParser"));
        Dataset generateStreamingDataset = createStreamingFlatTable.generateStreamingDataset(instanceFromEnv);
        StreamingDFBuildJob streamingDFBuildJob = new StreamingDFBuildJob(PROJECT);
        StreamingEntry streamingEntry = new StreamingEntry();
        streamingEntry.parseParams(new String[]{PROJECT, DATAFLOW_ID, "1000", "", "xx"});
        streamingEntry.setSparkSession(orCreate);
        MicroBatchEntry microBatchEntry = new MicroBatchEntry(generateStreamingDataset, 0L, "SSB_TOPIC_0_DOT_0_LO_PARTITIONCOLUMN", createStreamingFlatTable, dataflow2, fromLayouts, streamingDFBuildJob, createSegmentRange(0L, 10L, 3, 100L, 200L));
        ArrayBuffer arrayBuffer = new ArrayBuffer(1);
        streamingEntry.processMicroBatch(microBatchEntry, arrayBuffer);
        Assert.assertEquals(1L, nDataflowManager.getDataflow(DATAFLOW_ID).getSegments().size());
        microBatchEntry.setSegmentRange(createSegmentRange(10L, 20L, 3, 200L, 300L));
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        streamingEntry.processMicroBatch(microBatchEntry, arrayBuffer);
        Assert.assertEquals(2L, nDataflowManager.getDataflow(DATAFLOW_ID).getSegments().size());
        microBatchEntry.setSegmentRange(createSegmentRange(20L, 30L, 3, 300L, 400L));
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        streamingEntry.processMicroBatch(microBatchEntry, arrayBuffer);
        Assert.assertEquals(3L, nDataflowManager.getDataflow(DATAFLOW_ID).getSegments().size());
        microBatchEntry.setSegmentRange(createSegmentRange(30L, 40L, 3, 400L, 500L));
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        streamingEntry.processMicroBatch(microBatchEntry, arrayBuffer);
        NDataflow dataflow3 = nDataflowManager.getDataflow(DATAFLOW_ID);
        Assert.assertEquals(4L, dataflow3.getSegments().size());
        MergeJobEntry createMergeJobEntry = createMergeJobEntry(nDataflowManager, dataflow3, orCreate, PROJECT);
        NDataflow dataflow4 = nDataflowManager.getDataflow(dataflow3.getId());
        StreamingDFMergeJob streamingDFMergeJob = new StreamingDFMergeJob();
        KylinBuildEnv.getOrCreate(instanceFromEnv);
        try {
            streamingDFMergeJob.streamingMergeSegment(createMergeJobEntry);
            Assert.assertEquals(100L, createMergeJobEntry.afterMergeSegmentSourceCount());
            Assert.assertEquals(5L, dataflow4.getSegments().size());
            Assert.assertEquals("1", dataflow4.getSegment(createMergeJobEntry.afterMergeSegment().getId()).getAdditionalInfo().get("file_layer"));
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kylin.streaming.util.StreamingTestCase
    public MergeJobEntry createMergeJobEntry(NDataflowManager nDataflowManager, NDataflow nDataflow, SparkSession sparkSession, String str) {
        Segments segments = nDataflow.getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.READY, SegmentStatusEnum.WARNING});
        NDataSegment mergeSegments = nDataflowManager.mergeSegments(nDataflowManager.getDataflow(nDataflow.getId()).copy(), new SegmentRange.KafkaOffsetPartitionedSegmentRange(0L, 40L, createKafkaPartitionsOffset(3, 100L), createKafkaPartitionsOffset(3, 500L)), true, 1, (String) null);
        return new MergeJobEntry(sparkSession, str, nDataflow.getId(), 100L, new AtomicLong(System.currentTimeMillis()), (List) segments.stream().map(nDataSegment -> {
            return nDataflow.getSegment(nDataSegment.getId());
        }).collect(Collectors.toList()), mergeSegments);
    }

    public NDataflow prepareSegment() {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
        NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
        NIndexPlanManager nIndexPlanManager = NIndexPlanManager.getInstance(getTestConfig(), PROJECT);
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
        nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflow.getSegments().toArray(new NDataSegment[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate);
        NDataflow dataflow2 = nDataflowManager.getDataflow(dataflow.getId());
        NDataSegment appendSegmentForStreaming = nDataflowManager.appendSegmentForStreaming(dataflow2, new SegmentRange.KafkaOffsetPartitionedSegmentRange(0L, 10L, createKafkaPartitionsOffset(3, 100L), createKafkaPartitionsOffset(3, 200L)));
        NDataSegment appendSegmentForStreaming2 = nDataflowManager.appendSegmentForStreaming(dataflow2, new SegmentRange.KafkaOffsetPartitionedSegmentRange(10L, 20L, createKafkaPartitionsOffset(3, 200L), createKafkaPartitionsOffset(3, 300L)));
        NDataSegment appendSegmentForStreaming3 = nDataflowManager.appendSegmentForStreaming(dataflow2, new SegmentRange.KafkaOffsetPartitionedSegmentRange(20L, 30L, createKafkaPartitionsOffset(3, 300L), createKafkaPartitionsOffset(3, 400L)));
        NDataSegment appendSegmentForStreaming4 = nDataflowManager.appendSegmentForStreaming(dataflow2, new SegmentRange.KafkaOffsetPartitionedSegmentRange(30L, 40L, createKafkaPartitionsOffset(3, 400L), createKafkaPartitionsOffset(3, 500L)));
        appendSegmentForStreaming.setStatus(SegmentStatusEnum.READY);
        appendSegmentForStreaming2.setStatus(SegmentStatusEnum.READY);
        appendSegmentForStreaming3.setStatus(SegmentStatusEnum.READY);
        appendSegmentForStreaming4.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate2 = new NDataflowUpdate(dataflow2.getUuid());
        nDataflowUpdate2.setToUpdateSegs(new NDataSegment[]{appendSegmentForStreaming, appendSegmentForStreaming2, appendSegmentForStreaming3, appendSegmentForStreaming4});
        ArrayList newArrayList = Lists.newArrayList();
        nIndexPlanManager.getIndexPlan(DATAFLOW_ID).getAllLayouts().forEach(layoutEntity -> {
            newArrayList.add(NDataLayout.newDataLayout(dataflow2, appendSegmentForStreaming.getId(), layoutEntity.getId()));
            newArrayList.add(NDataLayout.newDataLayout(dataflow2, appendSegmentForStreaming2.getId(), layoutEntity.getId()));
            newArrayList.add(NDataLayout.newDataLayout(dataflow2, appendSegmentForStreaming3.getId(), layoutEntity.getId()));
            newArrayList.add(NDataLayout.newDataLayout(dataflow2, appendSegmentForStreaming4.getId(), layoutEntity.getId()));
        });
        nDataflowUpdate2.setToAddOrUpdateLayouts((NDataLayout[]) newArrayList.toArray(new NDataLayout[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate2);
        return nDataflowManager.getDataflow(dataflow2.getId());
    }
}
