/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.jobs;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StreamingTestConstant;
import org.apache.kylin.common.response.RestResponse;
import org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
import org.apache.kylin.metadata.cube.cuboid.NSpanningTree;
import org.apache.kylin.metadata.cube.cuboid.NSpanningTreeFactory;
import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.source.kafka.NSparkKafkaSource;
import org.apache.kylin.streaming.CreateStreamingFlatTable;
import org.apache.kylin.streaming.app.StreamingEntry;
import org.apache.kylin.streaming.common.BuildJobEntry;
import org.apache.kylin.streaming.jobs.StreamingDFBuildJob;
import org.apache.kylin.streaming.rest.RestSupport;
import org.apache.kylin.streaming.util.ReflectionUtils;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import scala.Tuple3;

public class StreamingDFBuildJobTest
extends StreamingTestCase {
    private static final String PROJECT = "streaming_test";
    private static final String DATAFLOW_ID = "4965c827-fbb4-4ea1-a744-3f341a3b030d";
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setUp() throws Exception {
        this.createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        this.cleanupTestMetadata();
    }

    @Test
    public void testStreamingBuild() {
        KylinConfig config = StreamingDFBuildJobTest.getTestConfig();
        KylinBuildEnv.getOrCreate((KylinConfig)config);
        NSparkKafkaSource source = this.createSparkKafkaSource(config);
        source.enableMemoryStream(false);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        NDataflowManager mgr = NDataflowManager.getInstance((KylinConfig)config, (String)PROJECT);
        NDataflow df = mgr.getDataflow(DATAFLOW_ID);
        NDataflowUpdate update = new NDataflowUpdate(df.getUuid());
        update.setToRemoveSegs((NDataSegment[])df.getSegments().toArray((Object[])new NDataSegment[0]));
        mgr.updateDataflow(update);
        df = mgr.getDataflow(df.getId());
        Set layoutEntitys = StreamingUtils.getToBuildLayouts((NDataflow)df);
        NSpanningTree nSpanningTree = NSpanningTreeFactory.fromLayouts((Collection)layoutEntitys, (String)DATAFLOW_ID);
        NDataModel model = df.getModel();
        StreamingDFBuildJob builder = (StreamingDFBuildJob)Mockito.spy((Object)new StreamingDFBuildJob(PROJECT));
        StreamingEntry streamingEntry = new StreamingEntry();
        streamingEntry.parseParams(new String[]{PROJECT, DATAFLOW_ID, "3000", "", "xx"});
        SparkSession ss = this.createSparkSession();
        streamingEntry.setSparkSession(ss);
        Tuple3 tuple3 = streamingEntry.generateStreamQueryForOneModel();
        Dataset batchDF = (Dataset)tuple3._1();
        CreateStreamingFlatTable streamFlatTable = (CreateStreamingFlatTable)tuple3._3();
        NDataSegment seg1 = mgr.appendSegmentForStreaming(df, (SegmentRange)new SegmentRange.KafkaOffsetPartitionedSegmentRange(Long.valueOf(0L), Long.valueOf(10L), this.createKafkaPartitionsOffset(3, 100L), this.createKafkaPartitionsOffset(3, 200L)));
        seg1.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate update2 = new NDataflowUpdate(df.getUuid());
        update2.setToUpdateSegs(new NDataSegment[]{seg1});
        ArrayList layouts = Lists.newArrayList();
        NDataflow dfCopy = df;
        NIndexPlanManager indexManager = NIndexPlanManager.getInstance((KylinConfig)StreamingDFBuildJobTest.getTestConfig(), (String)PROJECT);
        indexManager.getIndexPlan(DATAFLOW_ID).getAllLayouts().forEach(layout -> layouts.add(NDataLayout.newDataLayout((NDataflow)dfCopy, (String)seg1.getId(), (long)layout.getId())));
        update2.setToAddOrUpdateLayouts(layouts.toArray(new NDataLayout[0]));
        mgr.updateDataflow(update2);
        streamFlatTable.seg_$eq(seg1);
        Dataset encodedStreamDataset = streamFlatTable.encodeStreamingDataset(seg1, model, batchDF);
        final BuildJobEntry batchBuildJob = new BuildJobEntry(ss, PROJECT, DATAFLOW_ID, 100L, seg1, encodedStreamDataset, nSpanningTree);
        try {
            final NDataflowManager dfMgr = NDataflowManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)PROJECT);
            NDataflow newDataflow = dfMgr.getDataflow(batchBuildJob.dataflowId());
            Assert.assertEquals((Object)RealizationStatusEnum.OFFLINE, (Object)newDataflow.getStatus());
            Assert.assertEquals((long)4L, (long)newDataflow.getSegment(seg1.getId()).getLayoutsMap().size());
            long oldFileCount = newDataflow.getSegment(seg1.getId()).getStorageFileCount();
            long oldByteSize = newDataflow.getSegment(seg1.getId()).getStorageBytesSize();
            builder.streamBuild(batchBuildJob);
            newDataflow = dfMgr.getDataflow(batchBuildJob.dataflowId());
            Assert.assertEquals((Object)RealizationStatusEnum.ONLINE, (Object)newDataflow.getStatus());
            Assert.assertEquals((long)4L, (long)newDataflow.getSegment(seg1.getId()).getLayoutsMap().size());
            Assert.assertTrue((newDataflow.getSegment(seg1.getId()).getStorageFileCount() > oldFileCount ? 1 : 0) != 0);
            Assert.assertTrue((newDataflow.getSegment(seg1.getId()).getStorageBytesSize() > oldByteSize ? 1 : 0) != 0);
            dfMgr.updateDataflow(batchBuildJob.dataflowId(), updater -> updater.setStatus(RealizationStatusEnum.OFFLINE));
            Mockito.when((Object)builder.createRestSupport()).thenReturn((Object)new RestSupport(config){

                public RestResponse execute(HttpRequestBase httpReqBase, Object param) {
                    dfMgr.updateDataflow(batchBuildJob.dataflowId(), updater -> updater.setStatus(RealizationStatusEnum.ONLINE));
                    return RestResponse.ok();
                }
            });
            builder.updateSegment(batchBuildJob);
            newDataflow = dfMgr.getDataflow(batchBuildJob.dataflowId());
            Assert.assertEquals((Object)RealizationStatusEnum.ONLINE, (Object)newDataflow.getStatus());
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testGetSegment() {
        KylinConfig config = StreamingDFBuildJobTest.getTestConfig();
        NSparkKafkaSource source = this.createSparkKafkaSource(config);
        source.enableMemoryStream(false);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        NDataflowManager mgr = NDataflowManager.getInstance((KylinConfig)config, (String)PROJECT);
        String dataflowId = "e78a89dd-847f-4574-8afa-8768b4228b73";
        NDataflow df = mgr.getDataflow("e78a89dd-847f-4574-8afa-8768b4228b73");
        StreamingDFBuildJob builder = new StreamingDFBuildJob(PROJECT);
        builder.setParam("dataflowId", "e78a89dd-847f-4574-8afa-8768b4228b73");
        NDataSegment seg = builder.getSegment("c380dd2a-43b8-4268-b73d-2a5f76236632");
        Assert.assertNotNull((Object)seg);
        Assert.assertEquals((Object)"c380dd2a-43b8-4268-b73d-2a5f76236632", (Object)seg.getId());
    }

    @Test
    public void testShutdown() {
        StreamingDFBuildJob builder = new StreamingDFBuildJob(PROJECT);
        builder.shutdown();
        BuildLayoutWithUpdate buildLayout = (BuildLayoutWithUpdate)ReflectionUtils.getField(builder, "buildLayoutWithUpdate");
        KylinConfig config = StreamingDFBuildJobTest.getTestConfig();
        try {
            buildLayout.submit(new BuildLayoutWithUpdate.JobEntity(){

                public long getIndexId() {
                    return 0L;
                }

                public String getName() {
                    return null;
                }

                public List<NDataLayout> build() throws IOException {
                    return null;
                }
            }, config);
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e instanceof RejectedExecutionException));
        }
    }
}

