/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.util;

import com.google.common.cache.Cache;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.metadata.cube.model.IndexPlan;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.metadata.model.ManagementType;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.source.SourceFactory;
import org.apache.kylin.source.kafka.NSparkKafkaSource;
import org.apache.kylin.streaming.common.MergeJobEntry;
import org.apache.kylin.streaming.util.CubeTestUtils;
import org.apache.kylin.streaming.util.ReflectionUtils;
import org.apache.spark.sql.SparkSession;
import org.awaitility.Awaitility;
import org.junit.Assert;

public class StreamingTestCase
extends NLocalFileMetadataTestCase {
    protected static String MODEL_ALIAS = "stream_merge1";

    public NDataflow createSegments(NDataflowManager mgr, NDataflow df, int number) {
        return this.createSegments(mgr, df, number, null, null);
    }

    public NDataflow createSegments(NDataflowManager mgr, NDataflow df, int number, Integer layer) {
        return this.createSegments(mgr, df, number, layer, null);
    }

    public NDataflow createSegments(NDataflowManager mgr, NDataflow df, int number, Integer layer, NDataflowManager.NDataflowUpdater updater) {
        Assert.assertTrue((number > 0 ? 1 : 0) != 0);
        for (long i = 0L; i < (long)number; ++i) {
            NDataSegment seg = mgr.appendSegmentForStreaming(df, (SegmentRange)new SegmentRange.KafkaOffsetPartitionedSegmentRange(Long.valueOf(i), Long.valueOf(i + 1L), this.createKafkaPartitionOffset(0, i * 100L), this.createKafkaPartitionOffset(0, (i + 1L) * 100L)));
            seg.setStatus(SegmentStatusEnum.READY);
            if (layer != null && layer > 0) {
                seg.getAdditionalInfo().put("file_layer", layer + "");
            }
            NDataflowUpdate update = new NDataflowUpdate(df.getUuid());
            update.setToUpdateSegs(new NDataSegment[]{seg});
            mgr.updateDataflow(update);
        }
        if (updater != null) {
            mgr.updateDataflow(df.getId(), updater);
        }
        return mgr.getDataflow(df.getId());
    }

    public void setSegmentStorageSize(NDataSegment seg, long size) {
        ReflectionUtils.setField(seg, "storageSize", (Object)size);
    }

    public NDataflow setSegmentStorageSize(NDataflowManager mgr, NDataflow df, long size) {
        for (int i = 0; i < df.getSegments().size(); ++i) {
            ReflectionUtils.setField(df.getSegments().get(i), "storageSize", (Object)size);
        }
        return mgr.getDataflow(df.getId());
    }

    public IndexPlan createIndexPlan(KylinConfig testConfig, String project, String modelId, String modelAlias) {
        NDataModelManager modelMgr = NDataModelManager.getInstance((KylinConfig)testConfig, (String)project);
        modelMgr.updateDataModel(modelId, copyForWrite -> copyForWrite.setManagementType(ManagementType.MODEL_BASED));
        NIndexPlanManager indexPlanMgr = NIndexPlanManager.getInstance((KylinConfig)testConfig, (String)project);
        IndexPlan indexPlan = indexPlanMgr.getIndexPlanByModelAlias(modelAlias);
        IndexPlan copy = indexPlan.copy();
        copy.setUuid(RandomUtil.randomUUIDStr());
        CubeTestUtils.createTmpModelAndCube(testConfig, copy, project, modelId);
        return copy;
    }

    protected MergeJobEntry createMergeJobEntry(NDataflowManager mgr, NDataflow df, SparkSession ss, String project) {
        NDataflowUpdate update = new NDataflowUpdate(df.getUuid());
        update.setToRemoveSegs((NDataSegment[])df.getSegments().toArray((Object[])new NDataSegment[0]));
        mgr.updateDataflow(update);
        df = mgr.getDataflowByModelAlias(MODEL_ALIAS);
        Assert.assertEquals((long)0L, (long)df.getSegments().size());
        df = this.createSegments(mgr, df, 10, null);
        Segments retainSegments = df.getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.READY, SegmentStatusEnum.WARNING});
        SegmentRange.KafkaOffsetPartitionedSegmentRange rangeToMerge = new SegmentRange.KafkaOffsetPartitionedSegmentRange(Long.valueOf(0L), Long.valueOf(10L), this.createKafkaPartitionOffset(0, 100L), this.createKafkaPartitionOffset(0, 1000L));
        NDataflowManager dfMgr = NDataflowManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)project);
        NDataSegment afterMergeSeg = dfMgr.mergeSegments(dfMgr.getDataflow(df.getId()), (SegmentRange)rangeToMerge, true, Integer.valueOf(1), null);
        NDataflow df1 = dfMgr.getDataflow(df.getId());
        Assert.assertEquals((long)11L, (long)df1.getSegments().size());
        Assert.assertEquals((Object)SegmentStatusEnum.NEW, (Object)df1.getSegment(afterMergeSeg.getId()).getStatus());
        Assert.assertEquals((Object)"1", df1.getSegment(afterMergeSeg.getId()).getAdditionalInfo().get("file_layer"));
        List updatedSegments = retainSegments.stream().map(seg -> df1.getSegment(seg.getId())).collect(Collectors.toList());
        AtomicLong globalMergeTime = new AtomicLong(System.currentTimeMillis());
        MergeJobEntry mergeJobEntry = new MergeJobEntry(ss, project, df.getId(), 0L, globalMergeTime, updatedSegments, afterMergeSeg);
        return mergeJobEntry;
    }

    protected SparkSession createSparkSession() {
        return SparkSession.builder().master("local").appName("test").getOrCreate();
    }

    public SegmentRange.KafkaOffsetPartitionedSegmentRange createSegmentRange() {
        SegmentRange.KafkaOffsetPartitionedSegmentRange rangeToMerge = new SegmentRange.KafkaOffsetPartitionedSegmentRange(Long.valueOf(0L), Long.valueOf(10L), this.createKafkaPartitionsOffset(3, 100L), this.createKafkaPartitionsOffset(3, 1000L));
        return rangeToMerge;
    }

    public SegmentRange.KafkaOffsetPartitionedSegmentRange createSegmentRange(long startOffset, long endOffset, int partitions, long partitionStartOffset, long partitionEndOffset) {
        SegmentRange.KafkaOffsetPartitionedSegmentRange rangeToMerge = new SegmentRange.KafkaOffsetPartitionedSegmentRange(Long.valueOf(startOffset), Long.valueOf(endOffset), this.createKafkaPartitionsOffset(partitions, partitionStartOffset), this.createKafkaPartitionsOffset(partitions, partitionEndOffset));
        return rangeToMerge;
    }

    public NSparkKafkaSource createSparkKafkaSource(final KylinConfig config) {
        ISourceAware sourceAware = new ISourceAware(){

            public int getSourceType() {
                return 1;
            }

            public KylinConfig getConfig() {
                return config;
            }
        };
        Cache cache = (Cache)ReflectionUtils.getField(SourceFactory.class, "sourceMap");
        cache.invalidateAll();
        NSparkKafkaSource source = (NSparkKafkaSource)SourceFactory.getSource((ISourceAware)sourceAware);
        assert (source.supportBuildSnapShotByPartition());
        return source;
    }

    public void testWithRetry(Callback callback) {
        AtomicBoolean assertMeet = new AtomicBoolean(false);
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            for (int i = 0; i < 3; ++i) {
                try {
                    callback.call();
                    assertMeet.set(true);
                    break;
                }
                catch (Exception e) {
                    continue;
                }
            }
            return true;
        });
        if (!assertMeet.get()) {
            Assert.fail();
        }
    }

    public static interface Callback {
        public void call() throws Exception;
    }
}

