package org.apache.kylin.streaming.util;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.guava30.shaded.common.cache.Cache;
import org.apache.kylin.metadata.cube.model.IndexPlan;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.metadata.model.ManagementType;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.source.SourceFactory;
import org.apache.kylin.source.kafka.NSparkKafkaSource;
import org.apache.kylin.streaming.common.MergeJobEntry;
import org.apache.spark.sql.SparkSession;
import org.awaitility.Awaitility;
import org.junit.Assert;

/* loaded from: input_file:org/apache/kylin/streaming/util/StreamingTestCase.class */
public class StreamingTestCase extends NLocalFileMetadataTestCase {
    protected static String MODEL_ALIAS;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/kylin/streaming/util/StreamingTestCase$Callback.class */
    public interface Callback {
        void call() throws Exception;
    }

    public NDataflow createSegments(NDataflowManager nDataflowManager, NDataflow nDataflow, int i) {
        return createSegments(nDataflowManager, nDataflow, i, null, null);
    }

    public NDataflow createSegments(NDataflowManager nDataflowManager, NDataflow nDataflow, int i, Integer num) {
        return createSegments(nDataflowManager, nDataflow, i, num, null);
    }

    public NDataflow createSegments(NDataflowManager nDataflowManager, NDataflow nDataflow, int i, Integer num, NDataflowManager.NDataflowUpdater nDataflowUpdater) {
        Assert.assertTrue(i > 0);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= i) {
                break;
            }
            NDataSegment appendSegmentForStreaming = nDataflowManager.appendSegmentForStreaming(nDataflow, new SegmentRange.KafkaOffsetPartitionedSegmentRange(Long.valueOf(j2), Long.valueOf(j2 + 1), createKafkaPartitionOffset(0, Long.valueOf(j2 * 100)), createKafkaPartitionOffset(0, Long.valueOf((j2 + 1) * 100))));
            appendSegmentForStreaming.setStatus(SegmentStatusEnum.READY);
            if (num != null && num.intValue() > 0) {
                appendSegmentForStreaming.getAdditionalInfo().put("file_layer", num + "");
            }
            NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(nDataflow.getUuid());
            nDataflowUpdate.setToUpdateSegs(new NDataSegment[]{appendSegmentForStreaming});
            nDataflowManager.updateDataflow(nDataflowUpdate);
            j = j2 + 1;
        }
        if (nDataflowUpdater != null) {
            nDataflowManager.updateDataflow(nDataflow.getId(), nDataflowUpdater);
        }
        return nDataflowManager.getDataflow(nDataflow.getId());
    }

    public void setSegmentStorageSize(NDataSegment nDataSegment, long j) {
        ReflectionUtils.setField(nDataSegment, "storageSize", Long.valueOf(j));
    }

    public NDataflow setSegmentStorageSize(NDataflowManager nDataflowManager, NDataflow nDataflow, long j) {
        for (int i = 0; i < nDataflow.getSegments().size(); i++) {
            ReflectionUtils.setField(nDataflow.getSegments().get(i), "storageSize", Long.valueOf(j));
        }
        return nDataflowManager.getDataflow(nDataflow.getId());
    }

    public IndexPlan createIndexPlan(KylinConfig kylinConfig, String str, String str2, String str3) {
        NDataModelManager.getInstance(kylinConfig, str).updateDataModel(str2, nDataModel -> {
            nDataModel.setManagementType(ManagementType.MODEL_BASED);
        });
        IndexPlan copy = NIndexPlanManager.getInstance(kylinConfig, str).getIndexPlanByModelAlias(str3).copy();
        copy.setUuid(RandomUtil.randomUUIDStr());
        CubeTestUtils.createTmpModelAndCube(kylinConfig, copy, str, str2);
        return copy;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MergeJobEntry createMergeJobEntry(NDataflowManager nDataflowManager, NDataflow nDataflow, SparkSession sparkSession, String str) {
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(nDataflow.getUuid());
        nDataflowUpdate.setToRemoveSegs((NDataSegment[]) nDataflow.getSegments().toArray(new NDataSegment[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate);
        NDataflow dataflowByModelAlias = nDataflowManager.getDataflowByModelAlias(MODEL_ALIAS);
        Assert.assertEquals(0L, dataflowByModelAlias.getSegments().size());
        NDataflow createSegments = createSegments(nDataflowManager, dataflowByModelAlias, 10, null);
        Segments segments = createSegments.getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.READY, SegmentStatusEnum.WARNING});
        SegmentRange.KafkaOffsetPartitionedSegmentRange kafkaOffsetPartitionedSegmentRange = new SegmentRange.KafkaOffsetPartitionedSegmentRange(0L, 10L, createKafkaPartitionOffset(0, 100L), createKafkaPartitionOffset(0, 1000L));
        NDataflowManager nDataflowManager2 = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), str);
        NDataSegment mergeSegments = nDataflowManager2.mergeSegments(nDataflowManager2.getDataflow(createSegments.getId()), kafkaOffsetPartitionedSegmentRange, true, 1, (String) null);
        NDataflow dataflow = nDataflowManager2.getDataflow(createSegments.getId());
        Assert.assertEquals(11L, dataflow.getSegments().size());
        Assert.assertEquals(SegmentStatusEnum.NEW, dataflow.getSegment(mergeSegments.getId()).getStatus());
        Assert.assertEquals("1", dataflow.getSegment(mergeSegments.getId()).getAdditionalInfo().get("file_layer"));
        return new MergeJobEntry(sparkSession, str, createSegments.getId(), 0L, new AtomicLong(System.currentTimeMillis()), (List) segments.stream().map(nDataSegment -> {
            return dataflow.getSegment(nDataSegment.getId());
        }).collect(Collectors.toList()), mergeSegments);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SparkSession createSparkSession() {
        return SparkSession.builder().master("local").appName("test").getOrCreate();
    }

    public SegmentRange.KafkaOffsetPartitionedSegmentRange createSegmentRange() {
        return new SegmentRange.KafkaOffsetPartitionedSegmentRange(0L, 10L, createKafkaPartitionsOffset(3, 100L), createKafkaPartitionsOffset(3, 1000L));
    }

    public SegmentRange.KafkaOffsetPartitionedSegmentRange createSegmentRange(long j, long j2, int i, long j3, long j4) {
        return new SegmentRange.KafkaOffsetPartitionedSegmentRange(Long.valueOf(j), Long.valueOf(j2), createKafkaPartitionsOffset(i, Long.valueOf(j3)), createKafkaPartitionsOffset(i, Long.valueOf(j4)));
    }

    public NSparkKafkaSource createSparkKafkaSource(final KylinConfig kylinConfig) {
        ISourceAware iSourceAware = new ISourceAware() { // from class: org.apache.kylin.streaming.util.StreamingTestCase.1
            public int getSourceType() {
                return 1;
            }

            public KylinConfig getConfig() {
                return kylinConfig;
            }
        };
        ((Cache) ReflectionUtils.getField((Class<?>) SourceFactory.class, "sourceMap")).invalidateAll();
        NSparkKafkaSource source = SourceFactory.getSource(iSourceAware);
        if ($assertionsDisabled || source.supportBuildSnapShotByPartition()) {
            return source;
        }
        throw new AssertionError();
    }

    public void testWithRetry(Callback callback) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            for (int i = 0; i < 3; i++) {
                try {
                    callback.call();
                    atomicBoolean.set(true);
                    break;
                } catch (Exception e) {
                }
            }
            return true;
        });
        if (atomicBoolean.get()) {
            return;
        }
        Assert.fail();
    }

    static {
        $assertionsDisabled = !StreamingTestCase.class.desiredAssertionStatus();
        MODEL_ALIAS = "stream_merge1";
    }
}
