package org.apache.kylin.streaming;

import java.time.LocalDate;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Generated;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.ServerErrorCode;
import org.apache.kylin.common.response.RestResponse;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.streaming.app.StreamingMergeEntry;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.rest.RestSupport;
import org.apache.kylin.streaming.util.ReflectionUtils;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.test.util.ReflectionTestUtils;

/* loaded from: input_file:org/apache/kylin/streaming/StreamingMergeEntryTest.class */
public class StreamingMergeEntryTest extends StreamingTestCase {

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamingMergeEntryTest.class);
    private static String PROJECT = "streaming_test";
    private static String DATAFLOW_ID = "e78a89dd-847f-4574-8afa-8768b4228b72";

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        cleanupTestMetadata();
    }

    @Test
    public void testMergeSegmentLayer0() {
        KylinConfig testConfig = getTestConfig();
        testConfig.setProperty("kylin.engine.spark.cluster-manager-class-name", "io.kyligence.kap.streaming.util.MockClusterManager");
        testWithRetry(() -> {
            StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
            streamingMergeEntry.setThresholdOfSegSize(20480L);
            streamingMergeEntry.setNumberOfSeg(10);
            streamingMergeEntry.setSparkSession(createSparkSession());
            NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
            NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
            NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
            nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflow.getSegments().toArray(new NDataSegment[0]));
            nDataflowManager.updateDataflow(nDataflowUpdate);
            setSegmentStorageSize(nDataflowManager, createSegments(nDataflowManager, nDataflowManager.getDataflow(DATAFLOW_ID), 11), 1024L);
            mockRestSupport(streamingMergeEntry, testConfig, 0);
            streamingMergeEntry.process(PROJECT, DATAFLOW_ID);
            streamingMergeEntry.getSparkSession().close();
            NDataflow dataflow2 = nDataflowManager.getDataflow(DATAFLOW_ID);
            Assert.assertEquals(2L, dataflow2.getSegments().size());
            Assert.assertEquals("1", ((NDataSegment) dataflow2.getSegments().get(0)).getAdditionalInfo().get("file_layer"));
            Assert.assertTrue(((NDataSegment) dataflow2.getSegments().get(1)).getAdditionalInfo().isEmpty());
        });
    }

    @Test
    public void testMergeSegment() {
        KylinConfig testConfig = getTestConfig();
        testConfig.setProperty("kylin.engine.spark.cluster-manager-class-name", "io.kyligence.kap.streaming.util.MockClusterManager");
        testWithRetry(() -> {
            StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
            streamingMergeEntry.setThresholdOfSegSize(20480L);
            streamingMergeEntry.setNumberOfSeg(3);
            streamingMergeEntry.setSparkSession(createSparkSession());
            NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
            NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
            NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
            nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflow.getSegments().toArray(new NDataSegment[0]));
            nDataflowManager.updateDataflow(nDataflowUpdate);
            setSegmentStorageSize(nDataflowManager, createSegments(nDataflowManager, nDataflowManager.getDataflow(DATAFLOW_ID), 10), 1024L);
            mockRestSupport(streamingMergeEntry, testConfig, 0);
            for (int i = 0; i < 4; i++) {
                streamingMergeEntry.process(PROJECT, DATAFLOW_ID);
            }
            streamingMergeEntry.getSparkSession().close();
            NDataflow dataflow2 = nDataflowManager.getDataflow(DATAFLOW_ID);
            Assert.assertEquals(2L, dataflow2.getSegments().size());
            Assert.assertEquals("2", ((NDataSegment) dataflow2.getSegments().get(0)).getAdditionalInfo().get("file_layer"));
            Assert.assertTrue(((NDataSegment) dataflow2.getSegments().get(1)).getAdditionalInfo().isEmpty());
        });
    }

    @Test
    public void testMergeSegmentLayer1() {
        KylinConfig testConfig = getTestConfig();
        testWithRetry(() -> {
            StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
            streamingMergeEntry.setThresholdOfSegSize(20480L);
            streamingMergeEntry.setNumberOfSeg(3);
            streamingMergeEntry.setSparkSession(createSparkSession());
            NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
            NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
            NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
            nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflow.getSegments().toArray(new NDataSegment[0]));
            nDataflowManager.updateDataflow(nDataflowUpdate);
            setSegmentStorageSize(nDataflowManager, createSegments(nDataflowManager, nDataflowManager.getDataflow(DATAFLOW_ID), 10, 1), 1024L);
            mockRestSupport(streamingMergeEntry, testConfig, 0);
            streamingMergeEntry.process(PROJECT, DATAFLOW_ID);
            streamingMergeEntry.getSparkSession().close();
            NDataflow dataflow2 = nDataflowManager.getDataflow(DATAFLOW_ID);
            Assert.assertEquals(10L, dataflow2.getSegments().size());
            dataflow2.getSegments().stream().forEach(nDataSegment -> {
                Assert.assertEquals("1", nDataSegment.getAdditionalInfo().get("file_layer"));
            });
        });
    }

    @Test
    public void testMergeSegmentOfCatchup1() {
        KylinConfig testConfig = getTestConfig();
        testWithRetry(() -> {
            StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
            streamingMergeEntry.setThresholdOfSegSize(20480L);
            streamingMergeEntry.setNumberOfSeg(3);
            streamingMergeEntry.setSparkSession(createSparkSession());
            NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
            NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
            NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
            nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflow.getSegments().toArray(new NDataSegment[0]));
            nDataflowManager.updateDataflow(nDataflowUpdate);
            setSegmentStorageSize(nDataflowManager, createSegments(nDataflowManager, nDataflowManager.getDataflow(DATAFLOW_ID), 16), 1024L);
            mockRestSupport(streamingMergeEntry, testConfig, 0);
            streamingMergeEntry.process(PROJECT, DATAFLOW_ID);
            streamingMergeEntry.getSparkSession().close();
            NDataflow dataflow2 = nDataflowManager.getDataflow(DATAFLOW_ID);
            Assert.assertEquals(2L, dataflow2.getSegments().size());
            Assert.assertEquals("1", ((NDataSegment) dataflow2.getSegments().get(0)).getAdditionalInfo().get("file_layer"));
            Assert.assertTrue(((NDataSegment) dataflow2.getSegments().get(1)).getAdditionalInfo().isEmpty());
        });
    }

    @Test
    public void testMergeSegmentOfCatchup2() {
        KylinConfig testConfig = getTestConfig();
        testConfig.setProperty("kylin.engine.streaming-segment-merge-ratio", "1");
        testWithRetry(() -> {
            StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
            streamingMergeEntry.setThresholdOfSegSize(14336L);
            streamingMergeEntry.setNumberOfSeg(3);
            streamingMergeEntry.setSparkSession(createSparkSession());
            NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
            NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
            NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
            nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflow.getSegments().toArray(new NDataSegment[0]));
            nDataflowManager.updateDataflow(nDataflowUpdate);
            NDataflow createSegments = createSegments(nDataflowManager, nDataflowManager.getDataflow(DATAFLOW_ID), 16);
            Assert.assertEquals(16L, createSegments.getSegments().size());
            setSegmentStorageSize(nDataflowManager, createSegments, 1024L);
            mockRestSupport(streamingMergeEntry, testConfig, 0);
            streamingMergeEntry.process(PROJECT, DATAFLOW_ID);
            streamingMergeEntry.getSparkSession().close();
            NDataflow dataflow2 = nDataflowManager.getDataflow(DATAFLOW_ID);
            Assert.assertEquals(3L, dataflow2.getSegments().size());
            Assert.assertEquals("1", ((NDataSegment) dataflow2.getSegments().get(0)).getAdditionalInfo().get("file_layer"));
            Assert.assertTrue(((NDataSegment) dataflow2.getSegments().get(1)).getAdditionalInfo().isEmpty());
            Assert.assertTrue(((NDataSegment) dataflow2.getSegments().get(2)).getAdditionalInfo().isEmpty());
        });
    }

    @Test
    public void testMergeSegmentOfCatchup3() {
        KylinConfig testConfig = getTestConfig();
        testConfig.setProperty("kylin.engine.streaming-segment-merge-ratio", "1");
        testWithRetry(() -> {
            StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
            streamingMergeEntry.setThresholdOfSegSize(30720L);
            streamingMergeEntry.setNumberOfSeg(3);
            streamingMergeEntry.setSparkSession(createSparkSession());
            NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
            NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
            NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
            nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflow.getSegments().toArray(new NDataSegment[0]));
            nDataflowManager.updateDataflow(nDataflowUpdate);
            setSegmentStorageSize(nDataflowManager, createSegments(nDataflowManager, nDataflowManager.getDataflow(DATAFLOW_ID), 21, null, nDataflow -> {
                for (int i = 0; i < 2; i++) {
                    ((NDataSegment) nDataflow.getSegments().get(i)).getAdditionalInfo().put("file_layer", "2");
                }
                for (int i2 = 2; i2 < 5; i2++) {
                    ((NDataSegment) nDataflow.getSegments().get(i2)).getAdditionalInfo().put("file_layer", "1");
                }
            }), 1024L);
            mockRestSupport(streamingMergeEntry, testConfig, 0);
            streamingMergeEntry.process(PROJECT, DATAFLOW_ID);
            streamingMergeEntry.getSparkSession().close();
            NDataflow dataflow2 = nDataflowManager.getDataflow(DATAFLOW_ID);
            Assert.assertEquals(2L, dataflow2.getSegments().size());
            Assert.assertEquals("1", ((NDataSegment) dataflow2.getSegments().get(0)).getAdditionalInfo().get("file_layer"));
            Assert.assertTrue(((NDataSegment) dataflow2.getSegments().get(1)).getAdditionalInfo().isEmpty());
        });
    }

    @Test
    public void testMergeSegmentOfCatchup4() {
        KylinConfig testConfig = getTestConfig();
        testConfig.setProperty("kylin.engine.streaming-segment-merge-ratio", "1");
        testWithRetry(() -> {
            StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
            streamingMergeEntry.setThresholdOfSegSize(16384L);
            streamingMergeEntry.setNumberOfSeg(3);
            streamingMergeEntry.setSparkSession(createSparkSession());
            NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
            NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
            NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
            nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflow.getSegments().toArray(new NDataSegment[0]));
            nDataflowManager.updateDataflow(nDataflowUpdate);
            setSegmentStorageSize(nDataflowManager, createSegments(nDataflowManager, nDataflowManager.getDataflow(DATAFLOW_ID), 19, null, nDataflow -> {
                for (int i = 0; i < 1; i++) {
                    ((NDataSegment) nDataflow.getSegments().get(i)).getAdditionalInfo().put("file_layer", "2");
                }
                for (int i2 = 1; i2 < 2; i2++) {
                    ((NDataSegment) nDataflow.getSegments().get(i2)).getAdditionalInfo().put("file_layer", "1");
                }
            }), 1024L);
            mockRestSupport(streamingMergeEntry, testConfig, 0);
            streamingMergeEntry.process(PROJECT, DATAFLOW_ID);
            streamingMergeEntry.getSparkSession().close();
            NDataflow dataflow2 = nDataflowManager.getDataflow(DATAFLOW_ID);
            Assert.assertEquals(4L, dataflow2.getSegments().size());
            Assert.assertEquals("1", ((NDataSegment) dataflow2.getSegments().get(0)).getAdditionalInfo().get("file_layer"));
            Assert.assertTrue(((NDataSegment) dataflow2.getSegments().get(1)).getAdditionalInfo().isEmpty());
            Assert.assertTrue(((NDataSegment) dataflow2.getSegments().get(2)).getAdditionalInfo().isEmpty());
            Assert.assertTrue(((NDataSegment) dataflow2.getSegments().get(3)).getAdditionalInfo().isEmpty());
        });
    }

    @Test
    public void testMergeSegmentOfPeak1() {
        KylinConfig testConfig = getTestConfig();
        testWithRetry(() -> {
            StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
            streamingMergeEntry.setThresholdOfSegSize(5120L);
            streamingMergeEntry.setNumberOfSeg(5);
            streamingMergeEntry.setSparkSession(createSparkSession());
            Assert.assertNotNull(streamingMergeEntry.getSparkSession());
            NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
            NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
            NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
            nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflow.getSegments().toArray(new NDataSegment[0]));
            nDataflowManager.updateDataflow(nDataflowUpdate);
            NDataflow createSegments = createSegments(nDataflowManager, nDataflowManager.getDataflow(DATAFLOW_ID), 6, null, nDataflow -> {
                for (int i = 0; i < 2; i++) {
                    ((NDataSegment) nDataflow.getSegments().get(i)).getAdditionalInfo().put("file_layer", "2");
                }
                for (int i2 = 2; i2 < 4; i2++) {
                    ((NDataSegment) nDataflow.getSegments().get(i2)).getAdditionalInfo().put("file_layer", "1");
                }
                for (int i3 = 4; i3 < 6; i3++) {
                }
            });
            for (int i = 0; i < 4; i++) {
                setSegmentStorageSize((NDataSegment) createSegments.getSegments().get(i), 2048L);
            }
            for (int i2 = 4; i2 < 6; i2++) {
                setSegmentStorageSize((NDataSegment) createSegments.getSegments().get(i2), 5120L);
            }
            nDataflowManager.getDataflow(createSegments.getId());
            mockRestSupport(streamingMergeEntry, testConfig, 0);
            streamingMergeEntry.process(PROJECT, DATAFLOW_ID);
            streamingMergeEntry.getSparkSession().stop();
            NDataflow dataflow2 = nDataflowManager.getDataflow(DATAFLOW_ID);
            Assert.assertEquals(2L, dataflow2.getSegments().size());
            Assert.assertEquals("1", ((NDataSegment) dataflow2.getSegments().get(0)).getAdditionalInfo().get("file_layer"));
            Assert.assertTrue(((NDataSegment) dataflow2.getSegments().get(1)).getAdditionalInfo().isEmpty());
        });
    }

    @Test
    public void testRemoveLastL0Segment_EmptySegment() {
        StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
        Segments segments = new Segments();
        ReflectionTestUtils.invokeMethod(streamingMergeEntry, "removeLastL0Segment", new Object[]{segments});
        Assert.assertTrue(segments.isEmpty());
    }

    @Test
    public void testRemoveLastL0Segment_AddInfo_Null() {
        StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
        Segments segments = new Segments();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), PROJECT).getDataflow(DATAFLOW_ID);
        for (int i = 0; i < 3; i++) {
            LocalDate plusMonths = LocalDate.parse("2000-01-01").plusMonths(i);
            LocalDate plusMonths2 = plusMonths.plusMonths(1L);
            NDataSegment nDataSegment = new NDataSegment();
            SegmentRange.TimePartitionedSegmentRange timePartitionedSegmentRange = new SegmentRange.TimePartitionedSegmentRange(plusMonths.toString(), plusMonths2.toString());
            nDataSegment.setId(RandomUtil.randomUUIDStr());
            nDataSegment.setName(Segments.makeSegmentName(timePartitionedSegmentRange));
            nDataSegment.setCreateTimeUTC(System.currentTimeMillis());
            nDataSegment.setSegmentRange(timePartitionedSegmentRange);
            nDataSegment.setStatus(SegmentStatusEnum.READY);
            nDataSegment.setAdditionalInfo((Map) null);
            nDataSegment.setDataflow(dataflow);
            segments.add(nDataSegment);
        }
        ReflectionTestUtils.invokeMethod(streamingMergeEntry, "removeLastL0Segment", new Object[]{segments});
        Assert.assertEquals(3L, segments.size());
    }

    @Test
    public void testRemoveLastL0Segment_FileLayer_Null() {
        StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
        Segments segments = new Segments();
        HashMap hashMap = new HashMap();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), PROJECT).getDataflow(DATAFLOW_ID);
        hashMap.put("abc", "2");
        for (int i = 0; i < 3; i++) {
            LocalDate plusMonths = LocalDate.parse("2000-01-01").plusMonths(i);
            LocalDate plusMonths2 = plusMonths.plusMonths(1L);
            NDataSegment nDataSegment = new NDataSegment();
            SegmentRange.TimePartitionedSegmentRange timePartitionedSegmentRange = new SegmentRange.TimePartitionedSegmentRange(plusMonths.toString(), plusMonths2.toString());
            nDataSegment.setId(RandomUtil.randomUUIDStr());
            nDataSegment.setName(Segments.makeSegmentName(timePartitionedSegmentRange));
            nDataSegment.setCreateTimeUTC(System.currentTimeMillis());
            nDataSegment.setSegmentRange(timePartitionedSegmentRange);
            nDataSegment.setStatus(SegmentStatusEnum.READY);
            nDataSegment.setAdditionalInfo(hashMap);
            nDataSegment.setDataflow(dataflow);
            segments.add(nDataSegment);
        }
        ReflectionTestUtils.invokeMethod(streamingMergeEntry, "removeLastL0Segment", new Object[]{segments});
        Assert.assertEquals(2L, segments.size());
    }

    @Test
    public void testRemoveLastL0Segment_FileLayer_NotNull() {
        StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
        Segments segments = new Segments();
        HashMap hashMap = new HashMap();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), PROJECT).getDataflow(DATAFLOW_ID);
        hashMap.put("file_layer", "2");
        for (int i = 0; i < 3; i++) {
            LocalDate plusMonths = LocalDate.parse("2000-01-01").plusMonths(i);
            LocalDate plusMonths2 = plusMonths.plusMonths(1L);
            NDataSegment nDataSegment = new NDataSegment();
            SegmentRange.TimePartitionedSegmentRange timePartitionedSegmentRange = new SegmentRange.TimePartitionedSegmentRange(plusMonths.toString(), plusMonths2.toString());
            nDataSegment.setId(RandomUtil.randomUUIDStr());
            nDataSegment.setName(Segments.makeSegmentName(timePartitionedSegmentRange));
            nDataSegment.setCreateTimeUTC(System.currentTimeMillis());
            nDataSegment.setSegmentRange(timePartitionedSegmentRange);
            nDataSegment.setStatus(SegmentStatusEnum.READY);
            nDataSegment.setAdditionalInfo(hashMap);
            nDataSegment.setDataflow(dataflow);
            segments.add(nDataSegment);
        }
        ReflectionTestUtils.invokeMethod(streamingMergeEntry, "removeLastL0Segment", new Object[]{segments});
        Assert.assertEquals(3L, segments.size());
    }

    @Test
    public void testScheduleException() {
        getTestConfig().setProperty("kylin.engine.streaming-segment-merge-interval", "1");
        String[] strArr = {PROJECT, DATAFLOW_ID + "-err", "5k", "5", "xx"};
        try {
            createSparkSession();
            new StreamingMergeEntry() { // from class: org.apache.kylin.streaming.StreamingMergeEntryTest.1
                public RestSupport createRestSupport(KylinConfig kylinConfig) {
                    return new RestSupport(kylinConfig) { // from class: org.apache.kylin.streaming.StreamingMergeEntryTest.1.1
                        public RestResponse<String> execute(HttpRequestBase httpRequestBase, Object obj) {
                            return RestResponse.ok("001");
                        }
                    };
                }
            }.execute(strArr);
        } catch (Exception e) {
            Assert.assertTrue(e instanceof ExecuteException);
        }
    }

    @Test
    public void testGetSegment() {
        StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
        nDataflowManager.updateDataflow(nDataflowManager.getDataflow(DATAFLOW_ID).getId(), nDataflow -> {
            nDataflow.getSegment("c380dd2a-43b8-4268-b73d-2a5f76236633").setStatus(SegmentStatusEnum.WARNING);
        });
        NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
        Assert.assertEquals(SegmentStatusEnum.WARNING, streamingMergeEntry.getSegment(dataflow.getSegments(), dataflow.getSegment("c380dd2a-43b8-4268-b73d-2a5f76236633"), PROJECT, DATAFLOW_ID).getStatus());
        nDataflowManager.updateDataflow(dataflow.getId(), nDataflow2 -> {
            nDataflow2.getSegment("c380dd2a-43b8-4268-b73d-2a5f76236633").setStatus(SegmentStatusEnum.NEW);
        });
        NDataflow dataflow2 = nDataflowManager.getDataflow(DATAFLOW_ID);
        Assert.assertEquals(SegmentStatusEnum.NEW, streamingMergeEntry.getSegment(dataflow2.getSegments(), dataflow2.getSegment("c380dd2a-43b8-4268-b73d-2a5f76236633"), PROJECT, DATAFLOW_ID).getStatus());
        this.thrown.expect(KylinException.class);
        streamingMergeEntry.getSegment(dataflow2.getSegments(), NDataSegment.empty(), PROJECT, DATAFLOW_ID);
    }

    @Test
    public void testRemoveSegment() {
        StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
        KylinConfig testConfig = getTestConfig();
        mockRestSupport(streamingMergeEntry, testConfig, "new-seg-123456");
        NDataSegment empty = NDataSegment.empty();
        streamingMergeEntry.parseParams(new String[]{PROJECT, DATAFLOW_ID, "32m", "3", "xx"});
        streamingMergeEntry.removeSegment(PROJECT, DATAFLOW_ID, empty);
        Assert.assertNull(NDataflowManager.getInstance(testConfig, PROJECT).getDataflow(DATAFLOW_ID).getSegment(empty.getId()));
    }

    @Test
    public void testMergeSegmentsException() {
        getTestConfig().setProperty("kylin.engine.streaming-segment-merge-interval", "0s");
        StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry() { // from class: org.apache.kylin.streaming.StreamingMergeEntryTest.2
            public NDataSegment allocateSegment(String str, String str2, List<NDataSegment> list, int i) {
                throw new KylinException(ServerErrorCode.SEGMENT_MERGE_FAILURE, "merge Exception");
            }
        });
        List asList = Arrays.asList(new NDataSegment());
        this.thrown.expect(KylinException.class);
        streamingMergeEntry.mergeSegments(PROJECT, DATAFLOW_ID, asList, 1);
    }

    @Test
    public void testMergeSegmentsDoExecute_ManualGracefulShutDown() throws ExecuteException {
        getTestConfig().setProperty("kylin.engine.streaming-segment-merge-interval", "0s");
        StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
        streamingMergeEntry.setSparkSession(createSparkSession());
        new Thread(() -> {
            Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() -> {
                return true;
            });
            streamingMergeEntry.setStopFlag(true);
        }).start();
        streamingMergeEntry.parseParams(new String[]{PROJECT, DATAFLOW_ID, "32m", "3", "xx"});
        ((StreamingMergeEntry) Mockito.doNothing().when(streamingMergeEntry)).process(PROJECT, DATAFLOW_ID);
        streamingMergeEntry.doExecute();
    }

    @Test
    public void testMergeSegmentsDoExecute_GracefulShutDown() throws ExecuteException {
        getTestConfig().setProperty("kylin.engine.streaming-segment-merge-interval", "0s");
        StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
        streamingMergeEntry.setSparkSession(createSparkSession());
        String str = DATAFLOW_ID + "_merge";
        streamingMergeEntry.parseParams(new String[]{PROJECT, DATAFLOW_ID, "32m", "3", "xx"});
        ((StreamingMergeEntry) Mockito.doNothing().when(streamingMergeEntry)).process(PROJECT, DATAFLOW_ID);
        ((StreamingMergeEntry) Mockito.doReturn(true).when(streamingMergeEntry)).isGracefulShutdown(PROJECT, str);
        streamingMergeEntry.doExecute();
        Assert.assertTrue(streamingMergeEntry.getStopFlag());
    }

    @Test
    public void testMergeSegmentsDoExecute_KillApplication() throws ExecuteException {
        getTestConfig().setProperty("kylin.engine.streaming-segment-merge-interval", "0s");
        StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
        streamingMergeEntry.setSparkSession(createSparkSession());
        String str = DATAFLOW_ID + "_merge";
        streamingMergeEntry.parseParams(new String[]{PROJECT, DATAFLOW_ID, "32m", "3", "xx"});
        ((StreamingMergeEntry) Mockito.doThrow(new Throwable[]{new RuntimeException()}).when(streamingMergeEntry)).process(PROJECT, DATAFLOW_ID);
        this.thrown.expect(ExecuteException.class);
        this.thrown.expectMessage("streaming merging segment error occured:");
        ((StreamingMergeEntry) Mockito.doReturn(false).when(streamingMergeEntry)).isGracefulShutdown(PROJECT, str);
        streamingMergeEntry.doExecute();
    }

    @Test
    public void testDoMergeStreamingSegment() {
        StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
        mockRestSupport(streamingMergeEntry, getTestConfig(), "new-seg-123456");
        streamingMergeEntry.parseParams(new String[]{PROJECT, DATAFLOW_ID, "32m", "3", "xx"});
        Assert.assertNull(streamingMergeEntry.doMergeStreamingSegment(PROJECT, DATAFLOW_ID, (SegmentRange.KafkaOffsetPartitionedSegmentRange) null, 1));
    }

    @Test
    public void testNoClearHdfsFiles() {
        getTestConfig();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
        NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
        NDataSegment nDataSegment = (NDataSegment) dataflow.getSegments().get(0);
        StreamingMergeEntry streamingMergeEntry = new StreamingMergeEntry();
        streamingMergeEntry.putHdfsFile(nDataSegment.getId(), new Pair(dataflow.getSegmentHdfsPath(nDataSegment.getId()), Long.valueOf(System.currentTimeMillis())));
        AtomicLong atomicLong = new AtomicLong(System.currentTimeMillis() - 60000);
        Assert.assertEquals(1L, ((Map) ReflectionUtils.getField(streamingMergeEntry, "removeSegIds")).size());
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
        nDataflowUpdate.setToRemoveSegs(new NDataSegment[]{nDataSegment});
        nDataflowManager.updateDataflow(nDataflowUpdate);
        streamingMergeEntry.clearHdfsFiles(nDataflowManager.getDataflow(dataflow.getId()), atomicLong);
        Assert.assertEquals(1L, ((Map) ReflectionUtils.getField(streamingMergeEntry, "removeSegIds")).size());
    }

    @Test
    public void testClearHdfsFiles_ClearFiles() {
        getTestConfig().setProperty("kylin.engine.streaming-segment-clean-interval", "0h");
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
        NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
        NDataSegment nDataSegment = (NDataSegment) dataflow.getSegments().get(0);
        StreamingMergeEntry streamingMergeEntry = new StreamingMergeEntry();
        streamingMergeEntry.putHdfsFile(nDataSegment.getId(), new Pair(dataflow.getSegmentHdfsPath(nDataSegment.getId()), Long.valueOf(System.currentTimeMillis())));
        AtomicLong atomicLong = new AtomicLong(System.currentTimeMillis() - 60000);
        Assert.assertEquals(1L, ((Map) ReflectionUtils.getField(streamingMergeEntry, "removeSegIds")).size());
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
        nDataflowUpdate.setToRemoveSegs(new NDataSegment[]{nDataSegment});
        nDataflowManager.updateDataflow(nDataflowUpdate);
        streamingMergeEntry.clearHdfsFiles(nDataflowManager.getDataflow(dataflow.getId()), atomicLong);
        Assert.assertEquals(0L, ((Map) ReflectionUtils.getField(streamingMergeEntry, "removeSegIds")).size());
    }

    @Test
    public void testClearHdfsFiles_NotClearFiles() {
        getTestConfig().setProperty("kylin.engine.streaming-segment-clean-interval", "1h");
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
        NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
        NDataSegment nDataSegment = (NDataSegment) dataflow.getSegments().get(0);
        StreamingMergeEntry streamingMergeEntry = new StreamingMergeEntry();
        streamingMergeEntry.putHdfsFile(nDataSegment.getId(), new Pair(dataflow.getSegmentHdfsPath(nDataSegment.getId()), Long.valueOf(System.currentTimeMillis())));
        AtomicLong atomicLong = new AtomicLong(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2L));
        Assert.assertEquals(1L, ((Map) ReflectionUtils.getField(streamingMergeEntry, "removeSegIds")).size());
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
        nDataflowUpdate.setToRemoveSegs(new NDataSegment[]{nDataSegment});
        nDataflowManager.updateDataflow(nDataflowUpdate);
        streamingMergeEntry.clearHdfsFiles(nDataflowManager.getDataflow(dataflow.getId()), atomicLong);
        Assert.assertEquals(1L, ((Map) ReflectionUtils.getField(streamingMergeEntry, "removeSegIds")).size());
    }

    @Test
    public void testClearHdfsFiles_CleanTooOldSeg() {
        getTestConfig().setProperty("kylin.engine.streaming-segment-clean-interval", "1h");
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
        NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
        NDataSegment nDataSegment = (NDataSegment) dataflow.getSegments().get(0);
        StreamingMergeEntry streamingMergeEntry = new StreamingMergeEntry();
        streamingMergeEntry.putHdfsFile(nDataSegment.getId(), new Pair(dataflow.getSegmentHdfsPath(nDataSegment.getId()), Long.valueOf(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(30L))));
        AtomicLong atomicLong = new AtomicLong(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1L));
        Assert.assertEquals(1L, ((Map) ReflectionUtils.getField(streamingMergeEntry, "removeSegIds")).size());
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
        nDataflowUpdate.setToRemoveSegs(new NDataSegment[]{nDataSegment});
        nDataflowManager.updateDataflow(nDataflowUpdate);
        streamingMergeEntry.clearHdfsFiles(nDataflowManager.getDataflow(dataflow.getId()), atomicLong);
        Assert.assertEquals(0L, ((Map) ReflectionUtils.getField(streamingMergeEntry, "removeSegIds")).size());
    }

    @Test
    public void testClearHdfsFiles_NotDeletedSegId() {
        getTestConfig().setProperty("kylin.engine.streaming-segment-clean-interval", "0h");
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
        NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
        NDataSegment nDataSegment = (NDataSegment) dataflow.getSegments().get(0);
        StreamingMergeEntry streamingMergeEntry = new StreamingMergeEntry();
        streamingMergeEntry.putHdfsFile(nDataSegment.getId(), new Pair(dataflow.getSegmentHdfsPath(nDataSegment.getId()), Long.valueOf(System.currentTimeMillis())));
        AtomicLong atomicLong = new AtomicLong(System.currentTimeMillis() - 60000);
        Assert.assertEquals(1L, ((Map) ReflectionUtils.getField(streamingMergeEntry, "removeSegIds")).size());
        streamingMergeEntry.clearHdfsFiles(nDataflowManager.getDataflow(dataflow.getId()), atomicLong);
        Assert.assertEquals(1L, ((Map) ReflectionUtils.getField(streamingMergeEntry, "removeSegIds")).size());
    }

    @Test
    public void testCloseAuditLogStore() {
        StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
        Mockito.when(Boolean.valueOf(streamingMergeEntry.isJobOnCluster())).thenReturn(true);
        streamingMergeEntry.closeAuditLogStore(createSparkSession());
    }

    @Test
    public void testCloseEntry() {
        ReflectionTestUtils.invokeMethod((StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry()), "close", new Object[]{false});
    }

    @Test
    public void testCloseEntry_Error() {
        ReflectionTestUtils.invokeMethod((StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry()), "close", new Object[]{true});
    }

    @Test
    public void testReportYarnApplicationInfo() {
        StreamingMergeEntry streamingMergeEntry = (StreamingMergeEntry) Mockito.spy(new StreamingMergeEntry());
        KylinConfig testConfig = getTestConfig();
        final AtomicLong atomicLong = new AtomicLong();
        streamingMergeEntry.parseParams(new String[]{PROJECT, DATAFLOW_ID, "32m", "3", "xx"});
        streamingMergeEntry.setSparkSession(createSparkSession());
        Mockito.when(streamingMergeEntry.createRestSupport(testConfig)).thenReturn(new RestSupport(testConfig) { // from class: org.apache.kylin.streaming.StreamingMergeEntryTest.3
            public RestResponse execute(HttpRequestBase httpRequestBase, Object obj) {
                atomicLong.set(Long.parseLong(StreamingUtils.getProcessId()));
                return RestResponse.ok("0");
            }
        });
        String processId = StreamingUtils.getProcessId();
        streamingMergeEntry.reportApplicationInfo();
        Assert.assertEquals(processId, String.valueOf(atomicLong.get()));
    }

    private <T> void mockRestSupport(StreamingMergeEntry streamingMergeEntry, KylinConfig kylinConfig, final T t) {
        Mockito.when(streamingMergeEntry.createRestSupport(kylinConfig)).thenReturn(new RestSupport(kylinConfig) { // from class: org.apache.kylin.streaming.StreamingMergeEntryTest.4
            public RestResponse execute(HttpRequestBase httpRequestBase, Object obj) {
                StreamingJobManager.getInstance(NLocalFileMetadataTestCase.getTestConfig(), StreamingMergeEntryTest.PROJECT).updateStreamingJob(StreamingMergeEntryTest.DATAFLOW_ID + "_merge", streamingJobMeta -> {
                    streamingJobMeta.setJobExecutionId(0);
                });
                return RestResponse.ok(t.toString());
            }
        });
    }

    @Test
    public void tryReplaceHostAddress() {
        StreamingMergeEntry streamingMergeEntry = new StreamingMergeEntry();
        Assert.assertEquals("http://127.0.0.1:8080", streamingMergeEntry.tryReplaceHostAddress("http://localhost:8080"));
        Assert.assertEquals("http://unknow-host-9345:8080", streamingMergeEntry.tryReplaceHostAddress("http://unknow-host-9345:8080"));
    }

    @Test
    public void testIsJobOnCluster() {
        Assert.assertFalse(new StreamingMergeEntry().isJobOnCluster());
    }

    @Test
    public void testGetJobParams() {
        Assert.assertTrue(!new StreamingMergeEntry().getJobParams(StreamingJobManager.getInstance(getTestConfig(), PROJECT).getStreamingJobByUuid(StreamingUtils.getJobId(DATAFLOW_ID, JobTypeEnum.STREAMING_MERGE.name()))).isEmpty());
    }

    @Test
    public void testIsGracefulShutdown() {
        Assert.assertFalse(new StreamingMergeEntry().isGracefulShutdown(PROJECT, StreamingUtils.getJobId("e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD.name())));
    }

    @Test
    public void testCreateRestSupport() {
        RestSupport createRestSupport = new StreamingMergeEntry().createRestSupport(getTestConfig());
        Assert.assertNotNull(createRestSupport);
        createRestSupport.close();
    }

    @Test
    public void testShutdown() {
        StreamingMergeEntry.stop();
        Assert.assertTrue(((AtomicBoolean) ReflectionUtils.getField(StreamingMergeEntry.class, "gracefulStop")).get());
    }

    @Test
    public void testGracefulStopInterface() {
        StreamingMergeEntry streamingMergeEntry = new StreamingMergeEntry();
        streamingMergeEntry.setStopFlag(true);
        Assert.assertTrue(streamingMergeEntry.getStopFlag());
        streamingMergeEntry.setStopFlag(false);
        Assert.assertFalse(streamingMergeEntry.getStopFlag());
    }
}
