/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.jobs;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StreamingTestConstant;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
import org.apache.kylin.metadata.cube.cuboid.NSpanningTree;
import org.apache.kylin.metadata.cube.cuboid.NSpanningTreeFactory;
import org.apache.kylin.metadata.cube.model.NCubeJoinedFlatTableDesc;
import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.source.kafka.NSparkKafkaSource;
import org.apache.kylin.streaming.CreateStreamingFlatTable;
import org.apache.kylin.streaming.app.StreamingEntry;
import org.apache.kylin.streaming.common.CreateFlatTableEntry;
import org.apache.kylin.streaming.common.MergeJobEntry;
import org.apache.kylin.streaming.common.MicroBatchEntry;
import org.apache.kylin.streaming.jobs.StreamingDFBuildJob;
import org.apache.kylin.streaming.jobs.StreamingDFMergeJob;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.mutable.ArrayBuffer;

public class StreamingDFMergeJobTest
extends StreamingTestCase {
    private static final Logger logger = LoggerFactory.getLogger(StreamingDFMergeJobTest.class);
    private static String PROJECT = "streaming_test";
    private static String DATAFLOW_ID = "e78a89dd-847f-4574-8afa-8768b4228b73";
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setUp() throws Exception {
        this.createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        this.cleanupTestMetadata();
    }

    @Test
    public void testStreamingMergeSegment() {
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        KylinBuildEnv.getOrCreate((KylinConfig)config);
        NSparkKafkaSource source = this.createSparkKafkaSource(config);
        source.enableMemoryStream(false);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        NDataflowManager dfMgr = NDataflowManager.getInstance((KylinConfig)config, (String)PROJECT);
        NDataflow df = dfMgr.getDataflow(DATAFLOW_ID);
        NDataflowUpdate update = new NDataflowUpdate(df.getUuid());
        update.setToRemoveSegsWithArray((NDataSegment[])df.getSegments().toArray((Object[])new NDataSegment[0]));
        dfMgr.updateDataflow(update);
        df = dfMgr.getDataflow(df.getId());
        Assert.assertEquals((long)0L, (long)df.getSegments().size());
        NCubeJoinedFlatTableDesc flatTableDesc = new NCubeJoinedFlatTableDesc(df.getIndexPlan());
        Set layouts = StreamingUtils.getToBuildLayouts((NDataflow)df);
        Preconditions.checkState((boolean)CollectionUtils.isNotEmpty((Collection)layouts), (String)"layouts is empty", (Object[])new Object[]{layouts});
        NSpanningTree nSpanningTree = NSpanningTreeFactory.fromLayouts((Collection)layouts, (String)DATAFLOW_ID);
        SparkSession ss = SparkSession.builder().master("local").appName("test").getOrCreate();
        CreateFlatTableEntry flatTableEntry = new CreateFlatTableEntry((IJoinedFlatTableDesc)flatTableDesc, null, nSpanningTree, ss, null, null, null, "org.apache.kylin.parser.TimedJsonStreamParser");
        CreateStreamingFlatTable flatTable = new CreateStreamingFlatTable(flatTableEntry);
        Dataset dataset = flatTable.generateStreamingDataset(config);
        StreamingDFBuildJob builder = new StreamingDFBuildJob(PROJECT);
        StreamingEntry streamingEntry = new StreamingEntry();
        streamingEntry.parseParams(new String[]{PROJECT, DATAFLOW_ID, "1000", "", "xx"});
        streamingEntry.setSparkSession(ss);
        SegmentRange.KafkaOffsetPartitionedSegmentRange sr1 = this.createSegmentRange(0L, 10L, 3, 100L, 200L);
        MicroBatchEntry microBatchEntry = new MicroBatchEntry(dataset, 0L, "SSB_TOPIC_0_DOT_0_LO_PARTITIONCOLUMN", flatTable, df, nSpanningTree, builder, sr1);
        ArrayBuffer minMaxBuffer = new ArrayBuffer(1);
        streamingEntry.processMicroBatch(microBatchEntry, minMaxBuffer);
        df = dfMgr.getDataflow(DATAFLOW_ID);
        Assert.assertEquals((long)1L, (long)df.getSegments().size());
        SegmentRange.KafkaOffsetPartitionedSegmentRange sr2 = this.createSegmentRange(10L, 20L, 3, 200L, 300L);
        microBatchEntry.setSegmentRange(sr2);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        streamingEntry.processMicroBatch(microBatchEntry, minMaxBuffer);
        df = dfMgr.getDataflow(DATAFLOW_ID);
        Assert.assertEquals((long)2L, (long)df.getSegments().size());
        SegmentRange.KafkaOffsetPartitionedSegmentRange sr3 = this.createSegmentRange(20L, 30L, 3, 300L, 400L);
        microBatchEntry.setSegmentRange(sr3);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        streamingEntry.processMicroBatch(microBatchEntry, minMaxBuffer);
        df = dfMgr.getDataflow(DATAFLOW_ID);
        Assert.assertEquals((long)3L, (long)df.getSegments().size());
        SegmentRange.KafkaOffsetPartitionedSegmentRange sr4 = this.createSegmentRange(30L, 40L, 3, 400L, 500L);
        microBatchEntry.setSegmentRange(sr4);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        streamingEntry.processMicroBatch(microBatchEntry, minMaxBuffer);
        df = dfMgr.getDataflow(DATAFLOW_ID);
        Assert.assertEquals((long)4L, (long)df.getSegments().size());
        MergeJobEntry mergeJobEntry = this.createMergeJobEntry(dfMgr, df, ss, PROJECT);
        df = dfMgr.getDataflow(df.getId());
        StreamingDFMergeJob merger = new StreamingDFMergeJob();
        KylinBuildEnv.getOrCreate((KylinConfig)config);
        try {
            merger.streamingMergeSegment(mergeJobEntry);
            Assert.assertEquals((long)100L, (long)mergeJobEntry.afterMergeSegmentSourceCount());
            Assert.assertEquals((long)5L, (long)df.getSegments().size());
            Assert.assertEquals((Object)"1", df.getSegment(mergeJobEntry.afterMergeSegment().getId()).getAdditionalInfo().get("file_layer"));
        }
        catch (Exception e) {
            logger.error(e.getMessage(), (Throwable)e);
        }
    }

    @Override
    protected MergeJobEntry createMergeJobEntry(NDataflowManager mgr, NDataflow df, SparkSession ss, String project) {
        Segments retainSegments = df.getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.READY, SegmentStatusEnum.WARNING});
        SegmentRange.KafkaOffsetPartitionedSegmentRange rangeToMerge = new SegmentRange.KafkaOffsetPartitionedSegmentRange(Long.valueOf(0L), Long.valueOf(40L), this.createKafkaPartitionsOffset(3, 100L), this.createKafkaPartitionsOffset(3, 500L));
        NDataflow copy = mgr.getDataflow(df.getId()).copy();
        NDataSegment afterMergeSeg = mgr.mergeSegments(copy, (SegmentRange)rangeToMerge, true, Integer.valueOf(1), null);
        List updatedSegments = retainSegments.stream().map(seg -> df.getSegment(seg.getId())).collect(Collectors.toList());
        AtomicLong globalMergeTime = new AtomicLong(System.currentTimeMillis());
        MergeJobEntry mergeJobEntry = new MergeJobEntry(ss, project, df.getId(), 100L, globalMergeTime, updatedSegments, afterMergeSeg);
        return mergeJobEntry;
    }

    public NDataflow prepareSegment() {
        NDataflowManager mgr = NDataflowManager.getInstance((KylinConfig)StreamingDFMergeJobTest.getTestConfig(), (String)PROJECT);
        NDataflow df = mgr.getDataflow(DATAFLOW_ID);
        NIndexPlanManager indexManager = NIndexPlanManager.getInstance((KylinConfig)StreamingDFMergeJobTest.getTestConfig(), (String)PROJECT);
        NDataflowUpdate update = new NDataflowUpdate(df.getUuid());
        update.setToRemoveSegs((NDataSegment[])df.getSegments().toArray((Object[])new NDataSegment[0]));
        mgr.updateDataflow(update);
        df = mgr.getDataflow(df.getId());
        NDataSegment seg1 = mgr.appendSegmentForStreaming(df, (SegmentRange)new SegmentRange.KafkaOffsetPartitionedSegmentRange(Long.valueOf(0L), Long.valueOf(10L), this.createKafkaPartitionsOffset(3, 100L), this.createKafkaPartitionsOffset(3, 200L)));
        NDataSegment seg2 = mgr.appendSegmentForStreaming(df, (SegmentRange)new SegmentRange.KafkaOffsetPartitionedSegmentRange(Long.valueOf(10L), Long.valueOf(20L), this.createKafkaPartitionsOffset(3, 200L), this.createKafkaPartitionsOffset(3, 300L)));
        NDataSegment seg3 = mgr.appendSegmentForStreaming(df, (SegmentRange)new SegmentRange.KafkaOffsetPartitionedSegmentRange(Long.valueOf(20L), Long.valueOf(30L), this.createKafkaPartitionsOffset(3, 300L), this.createKafkaPartitionsOffset(3, 400L)));
        NDataSegment seg4 = mgr.appendSegmentForStreaming(df, (SegmentRange)new SegmentRange.KafkaOffsetPartitionedSegmentRange(Long.valueOf(30L), Long.valueOf(40L), this.createKafkaPartitionsOffset(3, 400L), this.createKafkaPartitionsOffset(3, 500L)));
        seg1.setStatus(SegmentStatusEnum.READY);
        seg2.setStatus(SegmentStatusEnum.READY);
        seg3.setStatus(SegmentStatusEnum.READY);
        seg4.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate update2 = new NDataflowUpdate(df.getUuid());
        update2.setToUpdateSegs(new NDataSegment[]{seg1, seg2, seg3, seg4});
        ArrayList layouts = Lists.newArrayList();
        NDataflow dfCopy = df;
        indexManager.getIndexPlan(DATAFLOW_ID).getAllLayouts().forEach(layout -> {
            layouts.add(NDataLayout.newDataLayout((NDataflow)dfCopy, (String)seg1.getId(), (long)layout.getId()));
            layouts.add(NDataLayout.newDataLayout((NDataflow)dfCopy, (String)seg2.getId(), (long)layout.getId()));
            layouts.add(NDataLayout.newDataLayout((NDataflow)dfCopy, (String)seg3.getId(), (long)layout.getId()));
            layouts.add(NDataLayout.newDataLayout((NDataflow)dfCopy, (String)seg4.getId(), (long)layout.getId()));
        });
        update2.setToAddOrUpdateLayouts(layouts.toArray(new NDataLayout[0]));
        mgr.updateDataflow(update2);
        return mgr.getDataflow(df.getId());
    }
}

