package org.apache.pinot.plugin.minion.tasks.mergerollup;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.lineage.LineageEntry;
import org.apache.pinot.common.lineage.LineageEntryState;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageUtils;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.minion.BaseTaskMetadata;
import org.apache.pinot.common.minion.MergeRollupTaskMetadata;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.class */
public class MergeRollupTaskGeneratorTest {
    private static final String RAW_TABLE_NAME = "testTable";
    private static final String OFFLINE_TABLE_NAME = "testTable_OFFLINE";
    private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
    private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
    private static final String DAILY = "daily";
    private static final String MONTHLY = "monthly";

    private TableConfig getTableConfig(TableType tableType, Map<String, Map<String, String>> map) {
        return new TableConfigBuilder(tableType).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME).setTaskConfig(new TableTaskConfig(map)).build();
    }

    @Test
    public void testValidateIfMergeRollupCanBeEnabledOrNot() {
        Assert.assertTrue(MergeRollupTaskGenerator.validate(new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME).build(), "MergeRollupTask"));
        IngestionConfig ingestionConfig = new IngestionConfig();
        ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(Collections.emptyList(), "REFRESH", DAILY));
        Assert.assertFalse(MergeRollupTaskGenerator.validate(new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME).setIngestionConfig(ingestionConfig).build(), "MergeRollupTask"));
        Assert.assertTrue(MergeRollupTaskGenerator.validate(new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME).build(), "MergeRollupTask"));
        Assert.assertFalse(MergeRollupTaskGenerator.validate(new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME).setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build(), "MergeRollupTask"));
        Assert.assertFalse(MergeRollupTaskGenerator.validate(new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME).setDedupConfig(new DedupConfig(true, HashFunction.MD5)).build(), "MergeRollupTask"));
    }

    @Test
    public void testGenerateTasksCheckConfigs() {
        ClusterInfoAccessor clusterInfoAccessor = (ClusterInfoAccessor) Mockito.mock(ClusterInfoAccessor.class);
        Mockito.when(clusterInfoAccessor.getTaskStates("MergeRollupTask")).thenReturn(new HashMap());
        SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata("testTable__0__0__0", 5000L, 50000L, TimeUnit.MILLISECONDS, null);
        segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
        SegmentZKMetadata segmentZKMetadata2 = getSegmentZKMetadata("testTable__1__0__0", 5000L, 50000L, TimeUnit.MILLISECONDS, null);
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata, segmentZKMetadata2}));
        SegmentZKMetadata segmentZKMetadata3 = getSegmentZKMetadata("testTable__0", 5000L, 50000L, TimeUnit.MILLISECONDS, null);
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata3}));
        MergeRollupTaskGenerator mergeRollupTaskGenerator = new MergeRollupTaskGenerator();
        mergeRollupTaskGenerator.init(clusterInfoAccessor);
        Assert.assertTrue(MergeRollupTaskGenerator.filterSegmentsBasedOnStatus(TableType.REALTIME, Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata, segmentZKMetadata2})).isEmpty());
        Assert.assertTrue(mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{getTableConfig(TableType.REALTIME, new HashMap())})).isEmpty());
        Assert.assertFalse(MergeRollupTaskGenerator.filterSegmentsBasedOnStatus(TableType.OFFLINE, Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata3})).isEmpty());
        IngestionConfig ingestionConfig = new IngestionConfig();
        ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig((List) null, "REFRESH", (String) null));
        TableConfig tableConfig = getTableConfig(TableType.OFFLINE, new HashMap());
        tableConfig.setIngestionConfig(ingestionConfig);
        Assert.assertTrue(mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig})).isEmpty());
    }

    private void checkPinotTaskConfig(Map<String, String> map, String str, String str2, String str3, String str4, String str5, String str6) {
        Assert.assertEquals(map.get("segmentName"), str);
        checkPinotTaskConfig(map, str2, str3, str4, str5, str6);
    }

    private void checkPinotTaskConfig(Map<String, String> map, String str, String str2, String str3, String str4, String str5) {
        Assert.assertEquals(map.get("tableName"), OFFLINE_TABLE_NAME);
        Assert.assertTrue("true".equalsIgnoreCase(map.get("enableReplaceSegments")));
        Assert.assertEquals(map.get("mergeLevel"), str);
        Assert.assertEquals(map.get("mergeType"), str2);
        Assert.assertEquals(map.get("partitionBucketTimePeriod"), str3);
        Assert.assertEquals(map.get("roundBucketTimePeriod"), str4);
        Assert.assertEquals(map.get("maxNumRecordsPerSegment"), str5);
        Assert.assertTrue(map.get("segmentNamePrefix").startsWith("merged_"));
    }

    private void mockMergeRollupTaskMetadataGetterAndSetter(ClusterInfoAccessor clusterInfoAccessor) {
        HashMap hashMap = new HashMap();
        ((ClusterInfoAccessor) Mockito.doAnswer(invocationOnMock -> {
            Object[] arguments = invocationOnMock.getArguments();
            if (arguments == null || arguments.length <= 0 || arguments[0] == null) {
                return null;
            }
            MergeRollupTaskMetadata mergeRollupTaskMetadata = (MergeRollupTaskMetadata) arguments[0];
            hashMap.put(mergeRollupTaskMetadata.getTableNameWithType(), mergeRollupTaskMetadata);
            return null;
        }).when(clusterInfoAccessor)).setMinionTaskMetadata((BaseTaskMetadata) Mockito.any(MergeRollupTaskMetadata.class), (String) ArgumentMatchers.eq("MergeRollupTask"), ArgumentMatchers.anyInt());
        Mockito.when(clusterInfoAccessor.getMinionTaskMetadataZNRecord(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenAnswer(invocationOnMock2 -> {
            Object[] arguments = invocationOnMock2.getArguments();
            if (arguments == null || arguments.length != 2 || arguments[1] == null) {
                return null;
            }
            String str = (String) arguments[1];
            if (hashMap.containsKey(str)) {
                return ((MergeRollupTaskMetadata) hashMap.get(str)).toZNRecord();
            }
            return null;
        });
    }

    @Test
    public void testEmptyTable() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("daily.mergeType", "concat");
        hashMap2.put("daily.bufferTimePeriod", "1d");
        hashMap2.put("daily.bucketTimePeriod", "1d");
        hashMap2.put("daily.maxNumRecordsPerSegment", "1000000");
        hashMap.put("MergeRollupTask", hashMap2);
        TableConfig tableConfig = getTableConfig(TableType.OFFLINE, hashMap);
        ClusterInfoAccessor clusterInfoAccessor = (ClusterInfoAccessor) Mockito.mock(ClusterInfoAccessor.class);
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(Collections.emptyList()));
        mockMergeRollupTaskMetadataGetterAndSetter(clusterInfoAccessor);
        MergeRollupTaskGenerator mergeRollupTaskGenerator = new MergeRollupTaskGenerator();
        mergeRollupTaskGenerator.init(clusterInfoAccessor);
        List generateTasks = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertNull(clusterInfoAccessor.getMinionTaskMetadataZNRecord("MergeRollupTask", OFFLINE_TABLE_NAME));
        Assert.assertEquals(generateTasks.size(), 0);
    }

    @Test
    public void testEmptySegment() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("daily.mergeType", "concat");
        hashMap2.put("daily.bufferTimePeriod", "1d");
        hashMap2.put("daily.bucketTimePeriod", "1d");
        hashMap2.put("daily.maxNumRecordsPerSegment", "1000000");
        hashMap.put("MergeRollupTask", hashMap2);
        TableConfig tableConfig = getTableConfig(TableType.OFFLINE, hashMap);
        ClusterInfoAccessor clusterInfoAccessor = (ClusterInfoAccessor) Mockito.mock(ClusterInfoAccessor.class);
        long currentTimeMillis = System.currentTimeMillis();
        SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata("testTable__1", currentTimeMillis - 500000, currentTimeMillis, TimeUnit.MILLISECONDS, null);
        segmentZKMetadata.setTotalDocs(0L);
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata}));
        MergeRollupTaskGenerator mergeRollupTaskGenerator = new MergeRollupTaskGenerator();
        mergeRollupTaskGenerator.init(clusterInfoAccessor);
        List generateTasks = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertNull(clusterInfoAccessor.getMinionTaskMetadataZNRecord("MergeRollupTask", OFFLINE_TABLE_NAME));
        Assert.assertEquals(generateTasks.size(), 0);
    }

    @Test
    public void testBufferTime() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("daily.mergeType", "concat");
        hashMap2.put("daily.bufferTimePeriod", "1d");
        hashMap2.put("daily.bucketTimePeriod", "1d");
        hashMap2.put("daily.maxNumRecordsPerSegment", "1000000");
        hashMap.put("MergeRollupTask", hashMap2);
        TableConfig tableConfig = getTableConfig(TableType.OFFLINE, hashMap);
        ClusterInfoAccessor clusterInfoAccessor = (ClusterInfoAccessor) Mockito.mock(ClusterInfoAccessor.class);
        long currentTimeMillis = System.currentTimeMillis();
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{getSegmentZKMetadata("testTable__1", currentTimeMillis - 500000, currentTimeMillis, TimeUnit.MILLISECONDS, null)}));
        MergeRollupTaskGenerator mergeRollupTaskGenerator = new MergeRollupTaskGenerator();
        mergeRollupTaskGenerator.init(clusterInfoAccessor);
        Assert.assertEquals(mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig})).size(), 0);
    }

    @Test
    public void testMaxNumRecordsPerTask() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("daily.mergeType", "concat");
        hashMap2.put("daily.bufferTimePeriod", "2d");
        hashMap2.put("daily.bucketTimePeriod", "1d");
        hashMap2.put("daily.maxNumRecordsPerSegment", "1000000");
        hashMap2.put("daily.maxNumRecordsPerTask", "5000000");
        hashMap.put("MergeRollupTask", hashMap2);
        TableConfig tableConfig = getTableConfig(TableType.OFFLINE, hashMap);
        ClusterInfoAccessor clusterInfoAccessor = (ClusterInfoAccessor) Mockito.mock(ClusterInfoAccessor.class);
        SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata("testTable__1", 86400000L, 90000000L, TimeUnit.MILLISECONDS, "download1");
        segmentZKMetadata.setTotalDocs(2000000L);
        SegmentZKMetadata segmentZKMetadata2 = getSegmentZKMetadata("testTable__2", 86400000L, 100000000L, TimeUnit.MILLISECONDS, "download2");
        segmentZKMetadata2.setTotalDocs(4000000L);
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata, segmentZKMetadata2}));
        MergeRollupTaskGenerator mergeRollupTaskGenerator = new MergeRollupTaskGenerator();
        mergeRollupTaskGenerator.init(clusterInfoAccessor);
        List generateTasks = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertEquals(generateTasks.size(), 1);
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks.get(0)).getConfigs(), "testTable__1" + "," + "testTable__2", DAILY, "concat", "1d", null, "1000000");
        Assert.assertEquals((String) ((PinotTaskConfig) generateTasks.get(0)).getConfigs().get("downloadURL"), "download1,download2");
        SegmentZKMetadata segmentZKMetadata3 = getSegmentZKMetadata("testTable__3", 86400000L, 110000000L, TimeUnit.MILLISECONDS, null);
        segmentZKMetadata3.setTotalDocs(5000000L);
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata, segmentZKMetadata2, segmentZKMetadata3}));
        List generateTasks2 = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertEquals(generateTasks2.size(), 2);
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks2.get(0)).getConfigs(), "testTable__1" + "," + "testTable__2", DAILY, "concat", "1d", null, "1000000");
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks2.get(1)).getConfigs(), "testTable__3", DAILY, "concat", "1d", null, "1000000");
    }

    @Test
    public void testNumParallelBuckets() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("daily.mergeType", "concat");
        hashMap2.put("daily.bufferTimePeriod", "2d");
        hashMap2.put("daily.bucketTimePeriod", "1d");
        hashMap2.put("daily.maxNumRecordsPerSegment", "1000000");
        hashMap2.put("daily.maxNumParallelBuckets", "3");
        hashMap.put("MergeRollupTask", hashMap2);
        TableConfig tableConfig = getTableConfig(TableType.OFFLINE, hashMap);
        ClusterInfoAccessor clusterInfoAccessor = (ClusterInfoAccessor) Mockito.mock(ClusterInfoAccessor.class);
        SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata("testTable__1", 86400000L, 90000000L, TimeUnit.MILLISECONDS, "download1");
        SegmentZKMetadata segmentZKMetadata2 = getSegmentZKMetadata("testTable__2", 86400000L, 100000000L, TimeUnit.MILLISECONDS, "download2");
        SegmentZKMetadata segmentZKMetadata3 = getSegmentZKMetadata("testTable__3", 172800000L, 173000000L, TimeUnit.MILLISECONDS, "download3");
        SegmentZKMetadata segmentZKMetadata4 = getSegmentZKMetadata("testTable__4", 259200000L, 260000000L, TimeUnit.MILLISECONDS, "download4");
        SegmentZKMetadata segmentZKMetadata5 = getSegmentZKMetadata("testTable__5", 345600000L, 346000000L, TimeUnit.MILLISECONDS, "download5");
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata, segmentZKMetadata2, segmentZKMetadata3, segmentZKMetadata4, segmentZKMetadata5}));
        MergeRollupTaskGenerator mergeRollupTaskGenerator = new MergeRollupTaskGenerator();
        mergeRollupTaskGenerator.init(clusterInfoAccessor);
        List generateTasks = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertEquals(generateTasks.size(), 3);
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks.get(0)).getConfigs(), "testTable__1" + "," + "testTable__2", DAILY, "concat", "1d", null, "1000000");
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks.get(1)).getConfigs(), "testTable__3", DAILY, "concat", "1d", null, "1000000");
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks.get(2)).getConfigs(), "testTable__4", DAILY, "concat", "1d", null, "1000000");
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata, segmentZKMetadata2, segmentZKMetadata3, segmentZKMetadata4, segmentZKMetadata5, getSegmentZKMetadata("testTable__6", 172800000L, 260000000L, TimeUnit.MILLISECONDS, null)}));
        List generateTasks2 = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertEquals(generateTasks2.size(), 2);
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks2.get(0)).getConfigs(), "testTable__1" + "," + "testTable__2", DAILY, "concat", "1d", null, "1000000");
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks2.get(1)).getConfigs(), "testTable__3" + "," + "testTable__6", DAILY, "concat", "1d", null, "1000000");
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata, segmentZKMetadata2, segmentZKMetadata4, segmentZKMetadata5}));
        List generateTasks3 = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertEquals(generateTasks3.size(), 3);
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks3.get(0)).getConfigs(), "testTable__1" + "," + "testTable__2", DAILY, "concat", "1d", null, "1000000");
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks3.get(1)).getConfigs(), "testTable__4", DAILY, "concat", "1d", null, "1000000");
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks3.get(2)).getConfigs(), "testTable__5", DAILY, "concat", "1d", null, "1000000");
        SegmentZKMetadata segmentZKMetadata6 = getSegmentZKMetadata("testTable__6", 432000000L, 432100000L, TimeUnit.MILLISECONDS, null);
        segmentZKMetadata.setCustomMap(ImmutableMap.of("MergeRollupTask.mergeLevel", DAILY));
        segmentZKMetadata2.setCustomMap(ImmutableMap.of("MergeRollupTask.mergeLevel", DAILY));
        segmentZKMetadata4.setCustomMap(ImmutableMap.of("MergeRollupTask.mergeLevel", DAILY));
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata, segmentZKMetadata2, segmentZKMetadata3, segmentZKMetadata4, segmentZKMetadata5, segmentZKMetadata6}));
        List generateTasks4 = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertEquals(generateTasks4.size(), 3);
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks4.get(0)).getConfigs(), "testTable__3", DAILY, "concat", "1d", null, "1000000");
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks4.get(1)).getConfigs(), "testTable__5", DAILY, "concat", "1d", null, "1000000");
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks4.get(2)).getConfigs(), "testTable__6", DAILY, "concat", "1d", null, "1000000");
        hashMap2.put("monthly.mergeType", "concat");
        hashMap2.put("monthly.bufferTimePeriod", "30d");
        hashMap2.put("monthly.bucketTimePeriod", "30d");
        hashMap2.put("monthly.maxNumRecordsPerSegment", "1000000");
        hashMap2.put("monthly.maxNumParallelBuckets", "3");
        TreeMap treeMap = new TreeMap();
        treeMap.put(DAILY, 2592000000L);
        Mockito.when(clusterInfoAccessor.getMinionTaskMetadataZNRecord("MergeRollupTask", OFFLINE_TABLE_NAME)).thenReturn(new MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, treeMap).toZNRecord());
        SegmentZKMetadata segmentZKMetadata7 = getSegmentZKMetadata("testTable__7", 86400000L, 90000000L, TimeUnit.MILLISECONDS, "download7");
        SegmentZKMetadata segmentZKMetadata8 = getSegmentZKMetadata("testTable__8", 2592000000L, 2600000000L, TimeUnit.MILLISECONDS, "download8");
        segmentZKMetadata7.setCustomMap(ImmutableMap.of("MergeRollupTask.mergeLevel", DAILY));
        segmentZKMetadata8.setCustomMap(ImmutableMap.of("MergeRollupTask.mergeLevel", DAILY));
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata7, segmentZKMetadata8}));
        List generateTasks5 = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertEquals(generateTasks5.size(), 1);
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks5.get(0)).getConfigs(), "testTable__7", MONTHLY, "concat", "30d", null, "1000000");
    }

    @Test
    public void testPartitionedTable() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("daily.mergeType", "concat");
        hashMap2.put("daily.bufferTimePeriod", "2d");
        hashMap2.put("daily.bucketTimePeriod", "1d");
        hashMap2.put("daily.maxNumRecordsPerSegment", "1000000");
        hashMap.put("MergeRollupTask", hashMap2);
        TableConfig build = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME).setSegmentPartitionConfig(new SegmentPartitionConfig(Collections.singletonMap("memberId", new ColumnPartitionConfig("murmur", 10)))).setTaskConfig(new TableTaskConfig(hashMap)).build();
        SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata("testTable__1", 86400000L, 90000000L, TimeUnit.MILLISECONDS, null);
        segmentZKMetadata.setPartitionMetadata(new SegmentPartitionMetadata(Collections.singletonMap("memberId", new ColumnPartitionMetadata("murmur", 10, Collections.singleton(0), (Map) null))));
        SegmentZKMetadata segmentZKMetadata2 = getSegmentZKMetadata("testTable__2", 86400000L, 100000000L, TimeUnit.MILLISECONDS, null);
        segmentZKMetadata2.setPartitionMetadata(new SegmentPartitionMetadata(Collections.singletonMap("memberId", new ColumnPartitionMetadata("murmur", 10, Collections.singleton(0), (Map) null))));
        SegmentZKMetadata segmentZKMetadata3 = getSegmentZKMetadata("testTable__3", 86400000L, 110000000L, TimeUnit.MILLISECONDS, null);
        segmentZKMetadata3.setPartitionMetadata(new SegmentPartitionMetadata(Collections.singletonMap("memberId", new ColumnPartitionMetadata("murmur", 10, Collections.singleton(1), (Map) null))));
        SegmentZKMetadata segmentZKMetadata4 = getSegmentZKMetadata("testTable__4", 90000000L, 110000000L, TimeUnit.MILLISECONDS, null);
        segmentZKMetadata4.setPartitionMetadata(new SegmentPartitionMetadata(Collections.singletonMap("memberId", new ColumnPartitionMetadata("murmur", 10, Collections.singleton(1), (Map) null))));
        ClusterInfoAccessor clusterInfoAccessor = (ClusterInfoAccessor) Mockito.mock(ClusterInfoAccessor.class);
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata, segmentZKMetadata2, segmentZKMetadata3, segmentZKMetadata4}));
        MergeRollupTaskGenerator mergeRollupTaskGenerator = new MergeRollupTaskGenerator();
        mergeRollupTaskGenerator.init(clusterInfoAccessor);
        List<PinotTaskConfig> generateTasks = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{build}));
        Assert.assertEquals(generateTasks.size(), 2);
        String str = "testTable__1" + "," + "testTable__2";
        String str2 = "testTable__3" + "," + "testTable__4";
        boolean z = false;
        boolean z2 = false;
        for (PinotTaskConfig pinotTaskConfig : generateTasks) {
            if (!z) {
                z = ((String) pinotTaskConfig.getConfigs().get("segmentName")).equals(str);
            }
            if (!z2) {
                z2 = ((String) pinotTaskConfig.getConfigs().get("segmentName")).equals(str2);
            }
            Assert.assertTrue(z || z2);
            checkPinotTaskConfig(((PinotTaskConfig) generateTasks.get(0)).getConfigs(), DAILY, "concat", "1d", null, "1000000");
        }
        Assert.assertTrue(z && z2);
        hashMap2.put("daily.maxNumRecordsPerTask", "5000000");
        segmentZKMetadata.setTotalDocs(2000000L);
        segmentZKMetadata2.setTotalDocs(4000000L);
        segmentZKMetadata3.setTotalDocs(5000000L);
        segmentZKMetadata4.setTotalDocs(6000000L);
        List<PinotTaskConfig> generateTasks2 = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{build}));
        Assert.assertEquals(generateTasks2.size(), 3);
        boolean z3 = false;
        boolean z4 = false;
        boolean z5 = false;
        for (PinotTaskConfig pinotTaskConfig2 : generateTasks2) {
            if (!z3) {
                z3 = ((String) pinotTaskConfig2.getConfigs().get("segmentName")).equals(str);
            }
            if (!z4) {
                z4 = ((String) pinotTaskConfig2.getConfigs().get("segmentName")).equals("testTable__3");
            }
            if (!z5) {
                z5 = ((String) pinotTaskConfig2.getConfigs().get("segmentName")).equals("testTable__4");
            }
            Assert.assertTrue(z3 || z4 || z5);
            checkPinotTaskConfig(((PinotTaskConfig) generateTasks2.get(1)).getConfigs(), DAILY, "concat", "1d", null, "1000000");
        }
        Assert.assertTrue(z3 && z4 && z5);
    }

    @Test
    public void testUpdateWatermark() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("daily.mergeType", "concat");
        hashMap2.put("daily.bufferTimePeriod", "2d");
        hashMap2.put("daily.bucketTimePeriod", "1d");
        hashMap2.put("daily.maxNumRecordsPerSegment", "1000000");
        hashMap2.put("daily.maxNumRecordsPerTask", "5000000");
        hashMap.put("MergeRollupTask", hashMap2);
        TableConfig tableConfig = getTableConfig(TableType.OFFLINE, hashMap);
        SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata("testTable__1", 90000000L, 100000000L, TimeUnit.MILLISECONDS, null);
        SegmentZKMetadata segmentZKMetadata2 = getSegmentZKMetadata("testTable__2", 345600000L, 400000000L, TimeUnit.MILLISECONDS, null);
        ClusterInfoAccessor clusterInfoAccessor = (ClusterInfoAccessor) Mockito.mock(ClusterInfoAccessor.class);
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata, segmentZKMetadata2}));
        mockMergeRollupTaskMetadataGetterAndSetter(clusterInfoAccessor);
        MergeRollupTaskGenerator mergeRollupTaskGenerator = new MergeRollupTaskGenerator();
        mergeRollupTaskGenerator.init(clusterInfoAccessor);
        List generateTasks = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertEquals(((Long) MergeRollupTaskMetadata.fromZNRecord(clusterInfoAccessor.getMinionTaskMetadataZNRecord("MergeRollupTask", OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY)).longValue(), 86400000L);
        Assert.assertEquals(generateTasks.size(), 1);
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks.get(0)).getConfigs(), "testTable__1", DAILY, "concat", "1d", null, "1000000");
        TreeMap treeMap = new TreeMap();
        treeMap.put(DAILY, 86400000L);
        clusterInfoAccessor.setMinionTaskMetadata(new MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, treeMap), "MergeRollupTask", -1);
        segmentZKMetadata.setCustomMap(ImmutableMap.of("MergeRollupTask.mergeLevel", DAILY));
        List generateTasks2 = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertEquals(((Long) MergeRollupTaskMetadata.fromZNRecord(clusterInfoAccessor.getMinionTaskMetadataZNRecord("MergeRollupTask", OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY)).longValue(), 345600000L);
        Assert.assertEquals(generateTasks2.size(), 1);
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks2.get(0)).getConfigs(), "testTable__2", DAILY, "concat", "1d", null, "1000000");
        treeMap.put(DAILY, 345600000L);
        clusterInfoAccessor.setMinionTaskMetadata(new MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, treeMap), "MergeRollupTask", -1);
        segmentZKMetadata2.setCustomMap(ImmutableMap.of("MergeRollupTask.mergeLevel", DAILY));
        List generateTasks3 = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertEquals(((Long) MergeRollupTaskMetadata.fromZNRecord(clusterInfoAccessor.getMinionTaskMetadataZNRecord("MergeRollupTask", OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY)).longValue(), 345600000L);
        Assert.assertEquals(generateTasks3.size(), 0);
    }

    @Test
    public void testIncompleteTask() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("daily.mergeType", "concat");
        hashMap2.put("daily.bufferTimePeriod", "2d");
        hashMap2.put("daily.bucketTimePeriod", "1d");
        hashMap2.put("daily.maxNumRecordsPerSegment", "1000000");
        hashMap2.put("daily.maxNumRecordsPerTask", "5000000");
        hashMap.put("MergeRollupTask", hashMap2);
        TableConfig tableConfig = getTableConfig(TableType.OFFLINE, hashMap);
        SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata("testTable__1", 90000000L, 100000000L, TimeUnit.MILLISECONDS, null);
        SegmentZKMetadata segmentZKMetadata2 = getSegmentZKMetadata("testTable__2", 345600000L, 400000000L, TimeUnit.MILLISECONDS, null);
        SegmentZKMetadata segmentZKMetadata3 = getSegmentZKMetadata("merged_testTable__1", 90000000L, 100000000L, TimeUnit.MILLISECONDS, null);
        ClusterInfoAccessor clusterInfoAccessor = (ClusterInfoAccessor) Mockito.mock(ClusterInfoAccessor.class);
        TreeMap treeMap = new TreeMap();
        treeMap.put(DAILY, 86400000L);
        Mockito.when(clusterInfoAccessor.getMinionTaskMetadataZNRecord("MergeRollupTask", OFFLINE_TABLE_NAME)).thenReturn(new MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, treeMap).toZNRecord());
        HashMap hashMap3 = new HashMap();
        String str = "Task_MergeRollupTask_" + System.currentTimeMillis();
        HashMap hashMap4 = new HashMap();
        hashMap4.put("tableName", OFFLINE_TABLE_NAME);
        hashMap4.put("mergeLevel", DAILY);
        hashMap4.put("segmentName", "testTable__1");
        Mockito.when(clusterInfoAccessor.getTaskStates("MergeRollupTask")).thenReturn(hashMap3);
        Mockito.when(clusterInfoAccessor.getTaskConfigs(str)).thenReturn(Lists.newArrayList(new PinotTaskConfig[]{new PinotTaskConfig("MergeRollupTask", hashMap4)}));
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata, segmentZKMetadata2}));
        hashMap3.put(str, TaskState.IN_PROGRESS);
        MergeRollupTaskGenerator mergeRollupTaskGenerator = new MergeRollupTaskGenerator();
        mergeRollupTaskGenerator.init(clusterInfoAccessor);
        Assert.assertTrue(mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig})).isEmpty());
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata, segmentZKMetadata2}));
        String str2 = "Task_MergeRollupTask_" + (System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3L));
        hashMap3.remove(str);
        hashMap3.put(str2, TaskState.IN_PROGRESS);
        List generateTasks = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertEquals(generateTasks.size(), 1);
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks.get(0)).getConfigs(), "testTable__1", DAILY, "concat", "1d", null, "1000000");
        segmentZKMetadata3.setCustomMap(ImmutableMap.of("MergeRollupTask.mergeLevel", DAILY));
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata, segmentZKMetadata2, segmentZKMetadata3}));
        SegmentLineage segmentLineage = new SegmentLineage(OFFLINE_TABLE_NAME);
        segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(), new LineageEntry(Collections.singletonList("testTable__1"), Collections.singletonList("merged_testTable__1"), LineageEntryState.COMPLETED, 11111L));
        Mockito.when(clusterInfoAccessor.getSegmentLineage(OFFLINE_TABLE_NAME)).thenReturn(segmentLineage);
        hashMap3.put(str, TaskState.COMPLETED);
        List generateTasks2 = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertEquals(generateTasks2.size(), 1);
        checkPinotTaskConfig(((PinotTaskConfig) generateTasks2.get(0)).getConfigs(), "testTable__2", DAILY, "concat", "1d", null, "1000000");
    }

    @Test
    public void testSegmentSelectionMultiLevels() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("daily.mergeType", "concat");
        hashMap2.put("daily.bufferTimePeriod", "2d");
        hashMap2.put("daily.bucketTimePeriod", "1d");
        hashMap2.put("daily.maxNumRecordsPerSegment", "1000000");
        hashMap2.put("daily.maxNumRecordsPerTask", "5000000");
        hashMap2.put("monthly.mergeType", "rollup");
        hashMap2.put("monthly.bufferTimePeriod", "30d");
        hashMap2.put("monthly.bucketTimePeriod", "30d");
        hashMap2.put("monthly.roundBucketTimePeriod", "30d");
        hashMap2.put("monthly.maxNumRecordsPerSegment", "2000000");
        hashMap2.put("monthly.maxNumRecordsPerTask", "5000000");
        hashMap.put("MergeRollupTask", hashMap2);
        TableConfig tableConfig = getTableConfig(TableType.OFFLINE, hashMap);
        SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata("testTable__1", 86400000L, 90000000L, TimeUnit.MILLISECONDS, null);
        SegmentZKMetadata segmentZKMetadata2 = getSegmentZKMetadata("testTable__2", 86400000L, 100000000L, TimeUnit.MILLISECONDS, null);
        SegmentZKMetadata segmentZKMetadata3 = getSegmentZKMetadata("testTable__3", 86400000L, 110000000L, TimeUnit.MILLISECONDS, null);
        SegmentZKMetadata segmentZKMetadata4 = getSegmentZKMetadata("testTable__4", 2505600000L, 2592010000L, TimeUnit.MILLISECONDS, null);
        SegmentZKMetadata segmentZKMetadata5 = getSegmentZKMetadata("testTable__5", 2592000000L, 2592020000L, TimeUnit.MILLISECONDS, null);
        ClusterInfoAccessor clusterInfoAccessor = (ClusterInfoAccessor) Mockito.mock(ClusterInfoAccessor.class);
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata, segmentZKMetadata2, segmentZKMetadata3, segmentZKMetadata4, segmentZKMetadata5}));
        mockMergeRollupTaskMetadataGetterAndSetter(clusterInfoAccessor);
        MergeRollupTaskGenerator mergeRollupTaskGenerator = new MergeRollupTaskGenerator();
        mergeRollupTaskGenerator.init(clusterInfoAccessor);
        List generateTasks = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertEquals(((Long) MergeRollupTaskMetadata.fromZNRecord(clusterInfoAccessor.getMinionTaskMetadataZNRecord("MergeRollupTask", OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY)).longValue(), 86400000L);
        Assert.assertEquals(generateTasks.size(), 1);
        Map<String, String> configs = ((PinotTaskConfig) generateTasks.get(0)).getConfigs();
        checkPinotTaskConfig(configs, "testTable__1" + "," + "testTable__2" + "," + "testTable__3", DAILY, "concat", "1d", null, "1000000");
        SegmentZKMetadata segmentZKMetadata6 = getSegmentZKMetadata("merged_testTable__1__2__3", 86400000L, 110000000L, TimeUnit.MILLISECONDS, null);
        segmentZKMetadata6.setCustomMap(ImmutableMap.of("MergeRollupTask.mergeLevel", DAILY));
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata, segmentZKMetadata2, segmentZKMetadata3, segmentZKMetadata4, segmentZKMetadata5, segmentZKMetadata6}));
        SegmentLineage segmentLineage = new SegmentLineage(OFFLINE_TABLE_NAME);
        segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(), new LineageEntry(Arrays.asList("testTable__1", "testTable__2", "testTable__3"), Collections.singletonList("merged_testTable__1__2__3"), LineageEntryState.COMPLETED, 11111L));
        Mockito.when(clusterInfoAccessor.getSegmentLineage(OFFLINE_TABLE_NAME)).thenReturn(segmentLineage);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("Task_MergeRollupTask_1", TaskState.COMPLETED);
        Mockito.when(clusterInfoAccessor.getTaskStates("MergeRollupTask")).thenReturn(hashMap3);
        Mockito.when(clusterInfoAccessor.getTaskConfigs("Task_MergeRollupTask_1")).thenReturn(Lists.newArrayList(new PinotTaskConfig[]{new PinotTaskConfig("MergeRollupTask", configs)}));
        List generateTasks2 = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertEquals(((Long) MergeRollupTaskMetadata.fromZNRecord(clusterInfoAccessor.getMinionTaskMetadataZNRecord("MergeRollupTask", OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY)).longValue(), 2505600000L);
        Assert.assertEquals(generateTasks2.size(), 1);
        Map<String, String> configs2 = ((PinotTaskConfig) generateTasks2.get(0)).getConfigs();
        checkPinotTaskConfig(configs2, "testTable__4", DAILY, "concat", "1d", null, "1000000");
        SegmentZKMetadata segmentZKMetadata7 = getSegmentZKMetadata("merged_testTable__4_1", 2505600000L, 2591999999L, TimeUnit.MILLISECONDS, null);
        segmentZKMetadata7.setCustomMap(ImmutableMap.of("MergeRollupTask.mergeLevel", DAILY));
        SegmentZKMetadata segmentZKMetadata8 = getSegmentZKMetadata("merged_testTable__4_2", 2592000000L, 2592010000L, TimeUnit.MILLISECONDS, null);
        segmentZKMetadata8.setCustomMap(ImmutableMap.of("MergeRollupTask.mergeLevel", DAILY));
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata, segmentZKMetadata2, segmentZKMetadata3, segmentZKMetadata4, segmentZKMetadata5, segmentZKMetadata6, segmentZKMetadata7, segmentZKMetadata8}));
        segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(), new LineageEntry(Collections.singletonList("testTable__4"), Arrays.asList("merged_testTable__1__2__3", "merged_testTable__4_1"), LineageEntryState.COMPLETED, 11111L));
        hashMap3.put("Task_MergeRollupTask_2", TaskState.COMPLETED);
        Mockito.when(clusterInfoAccessor.getTaskConfigs("Task_MergeRollupTask_2")).thenReturn(Lists.newArrayList(new PinotTaskConfig[]{new PinotTaskConfig("MergeRollupTask", configs2)}));
        List generateTasks3 = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertEquals(((Long) MergeRollupTaskMetadata.fromZNRecord(clusterInfoAccessor.getMinionTaskMetadataZNRecord("MergeRollupTask", OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY)).longValue(), 2592000000L);
        Assert.assertEquals(((Long) MergeRollupTaskMetadata.fromZNRecord(clusterInfoAccessor.getMinionTaskMetadataZNRecord("MergeRollupTask", OFFLINE_TABLE_NAME)).getWatermarkMap().get(MONTHLY)).longValue(), 0L);
        Assert.assertEquals(generateTasks3.size(), 2);
        Map<String, String> configs3 = ((PinotTaskConfig) generateTasks3.get(0)).getConfigs();
        Map<String, String> configs4 = ((PinotTaskConfig) generateTasks3.get(1)).getConfigs();
        checkPinotTaskConfig(configs3, "merged_testTable__4_2" + "," + "testTable__5", DAILY, "concat", "1d", null, "1000000");
        checkPinotTaskConfig(configs4, "merged_testTable__1__2__3" + "," + "merged_testTable__4_1", MONTHLY, "rollup", "30d", "30d", "2000000");
        SegmentZKMetadata segmentZKMetadata9 = getSegmentZKMetadata("merged_testTable__4_2__5", 2592000000L, 2592020000L, TimeUnit.MILLISECONDS, null);
        segmentZKMetadata9.setCustomMap(ImmutableMap.of("MergeRollupTask.mergeLevel", DAILY));
        SegmentZKMetadata segmentZKMetadata10 = getSegmentZKMetadata("merged_testTable__1__2__3__4_1", 86400000L, 2591999999L, TimeUnit.MILLISECONDS, null);
        segmentZKMetadata10.setCustomMap(ImmutableMap.of("MergeRollupTask.mergeLevel", MONTHLY));
        Mockito.when(clusterInfoAccessor.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(new SegmentZKMetadata[]{segmentZKMetadata, segmentZKMetadata2, segmentZKMetadata3, segmentZKMetadata4, segmentZKMetadata5, segmentZKMetadata6, segmentZKMetadata7, segmentZKMetadata8, segmentZKMetadata9, segmentZKMetadata10}));
        segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(), new LineageEntry(Arrays.asList("merged_testTable__4_2", "testTable__5"), Collections.singletonList("merged_testTable__4_2__5"), LineageEntryState.COMPLETED, 11111L));
        segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(), new LineageEntry(Arrays.asList("merged_testTable__1__2__3", "merged_testTable__4_1"), Collections.singletonList("merged_testTable__1__2__3__4_1"), LineageEntryState.COMPLETED, 11111L));
        hashMap3.put("Task_MergeRollupTask_3", TaskState.COMPLETED);
        Mockito.when(clusterInfoAccessor.getTaskConfigs("Task_MergeRollupTask_3")).thenReturn(Lists.newArrayList(new PinotTaskConfig[]{new PinotTaskConfig("MergeRollupTask", configs3)}));
        hashMap3.put("Task_MergeRollupTask_4", TaskState.COMPLETED);
        Mockito.when(clusterInfoAccessor.getTaskConfigs("Task_MergeRollupTask_4")).thenReturn(Lists.newArrayList(new PinotTaskConfig[]{new PinotTaskConfig("MergeRollupTask", configs4)}));
        List generateTasks4 = mergeRollupTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{tableConfig}));
        Assert.assertEquals(((Long) MergeRollupTaskMetadata.fromZNRecord(clusterInfoAccessor.getMinionTaskMetadataZNRecord("MergeRollupTask", OFFLINE_TABLE_NAME)).getWatermarkMap().get(DAILY)).longValue(), 2592000000L);
        Assert.assertEquals(((Long) MergeRollupTaskMetadata.fromZNRecord(clusterInfoAccessor.getMinionTaskMetadataZNRecord("MergeRollupTask", OFFLINE_TABLE_NAME)).getWatermarkMap().get(MONTHLY)).longValue(), 0L);
        Assert.assertEquals(generateTasks4.size(), 0);
    }

    private SegmentZKMetadata getSegmentZKMetadata(String str, long j, long j2, TimeUnit timeUnit, String str2) {
        SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(str);
        segmentZKMetadata.setStartTime(j);
        segmentZKMetadata.setEndTime(j2);
        segmentZKMetadata.setTimeUnit(timeUnit);
        segmentZKMetadata.setDownloadUrl(str2);
        segmentZKMetadata.setTotalDocs(1000L);
        return segmentZKMetadata;
    }
}
