package org.apache.pinot.controller.helix.core.minion.generator;

import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.class */
public class RealtimeToOfflineSegmentsTaskGeneratorTest {
    private static final String RAW_TABLE_NAME = "testTable";
    private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
    private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
    private final Map<String, String> streamConfigs = new HashMap();

    @BeforeClass
    public void setup() {
        this.streamConfigs.put("streamType", "kafka");
        this.streamConfigs.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.type"), StreamConfig.ConsumerType.LOWLEVEL.toString());
        this.streamConfigs.put(StreamConfigProperties.constructStreamProperty("kafka", "topic.name"), "myTopic");
        this.streamConfigs.put(StreamConfigProperties.constructStreamProperty("kafka", "decoder.class.name"), "org.foo.Decoder");
    }

    private TableConfig getRealtimeTableConfig(Map<String, Map<String, String>> map) {
        return new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME).setStreamConfigs(this.streamConfigs).setTaskConfig(new TableTaskConfig(map)).build();
    }

    @Test
    public void testGenerateTasksCheckConfigs() {
        ClusterInfoAccessor clusterInfoAccessor = (ClusterInfoAccessor) Mockito.mock(ClusterInfoAccessor.class);
        Mockito.when(clusterInfoAccessor.getTaskStates("RealtimeToOfflineSegmentsTask")).thenReturn(new HashMap());
        Mockito.when(clusterInfoAccessor.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList(new LLCRealtimeSegmentZKMetadata[]{getRealtimeSegmentZKMetadata("testTable__0__0__12345", CommonConstants.Segment.Realtime.Status.DONE, 5000L, 50000L, TimeUnit.MILLISECONDS, null)}));
        RealtimeToOfflineSegmentsTaskGenerator realtimeToOfflineSegmentsTaskGenerator = new RealtimeToOfflineSegmentsTaskGenerator();
        realtimeToOfflineSegmentsTaskGenerator.init(clusterInfoAccessor);
        Assert.assertTrue(realtimeToOfflineSegmentsTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build()})).isEmpty());
        TableConfig realtimeTableConfig = getRealtimeTableConfig(new HashMap());
        realtimeTableConfig.setTaskConfig((TableTaskConfig) null);
        try {
            realtimeToOfflineSegmentsTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{realtimeTableConfig}));
            Assert.fail("Should have failed for null tableTaskConfig");
        } catch (IllegalStateException e) {
        }
        try {
            realtimeToOfflineSegmentsTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{getRealtimeTableConfig(new HashMap())}));
            Assert.fail("Should have failed for null taskConfig");
        } catch (IllegalStateException e2) {
        }
    }

    @Test
    public void testGenerateTasksSimultaneousConstraints() {
        HashMap hashMap = new HashMap();
        hashMap.put("RealtimeToOfflineSegmentsTask", new HashMap());
        TableConfig realtimeTableConfig = getRealtimeTableConfig(hashMap);
        ClusterInfoAccessor clusterInfoAccessor = (ClusterInfoAccessor) Mockito.mock(ClusterInfoAccessor.class);
        HashMap hashMap2 = new HashMap();
        String str = "Task_RealtimeToOfflineSegmentsTask_" + System.currentTimeMillis();
        HashMap hashMap3 = new HashMap();
        hashMap3.put("tableName", REALTIME_TABLE_NAME);
        Mockito.when(clusterInfoAccessor.getTaskStates("RealtimeToOfflineSegmentsTask")).thenReturn(hashMap2);
        Mockito.when(clusterInfoAccessor.getTaskConfigs(str)).thenReturn(Lists.newArrayList(new PinotTaskConfig[]{new PinotTaskConfig("RealtimeToOfflineSegmentsTask", hashMap3)}));
        Mockito.when(clusterInfoAccessor.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME)).thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 100000L));
        Mockito.when(clusterInfoAccessor.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList(new LLCRealtimeSegmentZKMetadata[]{getRealtimeSegmentZKMetadata("testTable__0__0__12345", CommonConstants.Segment.Realtime.Status.DONE, 80000000L, 90000000L, TimeUnit.MILLISECONDS, null)}));
        RealtimeToOfflineSegmentsTaskGenerator realtimeToOfflineSegmentsTaskGenerator = new RealtimeToOfflineSegmentsTaskGenerator();
        realtimeToOfflineSegmentsTaskGenerator.init(clusterInfoAccessor);
        hashMap2.put(str, TaskState.IN_PROGRESS);
        Assert.assertTrue(realtimeToOfflineSegmentsTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{realtimeTableConfig})).isEmpty());
        hashMap2.put(str, TaskState.COMPLETED);
        Assert.assertEquals(realtimeToOfflineSegmentsTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{realtimeTableConfig})).size(), 1);
        String str2 = "Task_RealtimeToOfflineSegmentsTask_" + (System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3L));
        hashMap2.remove(str);
        hashMap2.put(str2, TaskState.IN_PROGRESS);
        Assert.assertEquals(realtimeToOfflineSegmentsTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{realtimeTableConfig})).size(), 1);
    }

    @Test
    public void testGenerateTasksNoSegments() {
        HashMap hashMap = new HashMap();
        hashMap.put("RealtimeToOfflineSegmentsTask", new HashMap());
        TableConfig realtimeTableConfig = getRealtimeTableConfig(hashMap);
        ClusterInfoAccessor clusterInfoAccessor = (ClusterInfoAccessor) Mockito.mock(ClusterInfoAccessor.class);
        Mockito.when(clusterInfoAccessor.getTaskStates("RealtimeToOfflineSegmentsTask")).thenReturn(new HashMap());
        Mockito.when(clusterInfoAccessor.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList());
        RealtimeToOfflineSegmentsTaskGenerator realtimeToOfflineSegmentsTaskGenerator = new RealtimeToOfflineSegmentsTaskGenerator();
        realtimeToOfflineSegmentsTaskGenerator.init(clusterInfoAccessor);
        Assert.assertTrue(realtimeToOfflineSegmentsTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{realtimeTableConfig})).isEmpty());
        LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata = getRealtimeSegmentZKMetadata("testTable__0__0__12345", CommonConstants.Segment.Realtime.Status.IN_PROGRESS, -1L, -1L, TimeUnit.MILLISECONDS, null);
        Mockito.when(clusterInfoAccessor.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList(new LLCRealtimeSegmentZKMetadata[]{realtimeSegmentZKMetadata}));
        RealtimeToOfflineSegmentsTaskGenerator realtimeToOfflineSegmentsTaskGenerator2 = new RealtimeToOfflineSegmentsTaskGenerator();
        realtimeToOfflineSegmentsTaskGenerator2.init(clusterInfoAccessor);
        Assert.assertTrue(realtimeToOfflineSegmentsTaskGenerator2.generateTasks(Lists.newArrayList(new TableConfig[]{realtimeTableConfig})).isEmpty());
        Mockito.when(clusterInfoAccessor.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList(new LLCRealtimeSegmentZKMetadata[]{realtimeSegmentZKMetadata, getRealtimeSegmentZKMetadata("testTable__1__0__12345", CommonConstants.Segment.Realtime.Status.DONE, 5000L, 10000L, TimeUnit.MILLISECONDS, null), getRealtimeSegmentZKMetadata("testTable__1__1__13456", CommonConstants.Segment.Realtime.Status.IN_PROGRESS, -1L, -1L, TimeUnit.MILLISECONDS, null)}));
        RealtimeToOfflineSegmentsTaskGenerator realtimeToOfflineSegmentsTaskGenerator3 = new RealtimeToOfflineSegmentsTaskGenerator();
        realtimeToOfflineSegmentsTaskGenerator3.init(clusterInfoAccessor);
        Assert.assertTrue(realtimeToOfflineSegmentsTaskGenerator3.generateTasks(Lists.newArrayList(new TableConfig[]{realtimeTableConfig})).isEmpty());
    }

    @Test
    public void testGenerateTasksNoMinionMetadata() {
        ClusterInfoAccessor clusterInfoAccessor = (ClusterInfoAccessor) Mockito.mock(ClusterInfoAccessor.class);
        Mockito.when(clusterInfoAccessor.getTaskStates("RealtimeToOfflineSegmentsTask")).thenReturn(new HashMap());
        Mockito.when(clusterInfoAccessor.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME)).thenReturn((Object) null);
        Mockito.when(clusterInfoAccessor.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList(new LLCRealtimeSegmentZKMetadata[]{getRealtimeSegmentZKMetadata("testTable__0__0__12345", CommonConstants.Segment.Realtime.Status.DONE, 1590048000000L, 1590134400000L, TimeUnit.MILLISECONDS, "download1"), getRealtimeSegmentZKMetadata("testTable__1__0__12345", CommonConstants.Segment.Realtime.Status.DONE, 1590048000000L, 1590134400000L, TimeUnit.MILLISECONDS, "download2")}));
        HashMap hashMap = new HashMap();
        hashMap.put("RealtimeToOfflineSegmentsTask", new HashMap());
        TableConfig realtimeTableConfig = getRealtimeTableConfig(hashMap);
        RealtimeToOfflineSegmentsTaskGenerator realtimeToOfflineSegmentsTaskGenerator = new RealtimeToOfflineSegmentsTaskGenerator();
        realtimeToOfflineSegmentsTaskGenerator.init(clusterInfoAccessor);
        List generateTasks = realtimeToOfflineSegmentsTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{realtimeTableConfig}));
        Assert.assertEquals(generateTasks.size(), 1);
        Assert.assertEquals(((PinotTaskConfig) generateTasks.get(0)).getTaskType(), "RealtimeToOfflineSegmentsTask");
        Map configs = ((PinotTaskConfig) generateTasks.get(0)).getConfigs();
        Assert.assertEquals((String) configs.get("tableName"), REALTIME_TABLE_NAME);
        Assert.assertEquals((String) configs.get("segmentName"), "testTable__0__0__12345,testTable__1__0__12345");
        Assert.assertEquals((String) configs.get("downloadURL"), "download1,download2");
        Assert.assertEquals((String) configs.get("windowStartMs"), "1590019200000");
        Assert.assertEquals((String) configs.get("windowEndMs"), "1590105600000");
        Mockito.when(clusterInfoAccessor.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList(new LLCRealtimeSegmentZKMetadata[]{getRealtimeSegmentZKMetadata("testTable__0__0__12345", CommonConstants.Segment.Realtime.Status.DONE, 441680L, 441703L, TimeUnit.HOURS, "download1"), getRealtimeSegmentZKMetadata("testTable__1__0__12345", CommonConstants.Segment.Realtime.Status.DONE, 441680L, 441703L, TimeUnit.HOURS, "download2")}));
        RealtimeToOfflineSegmentsTaskGenerator realtimeToOfflineSegmentsTaskGenerator2 = new RealtimeToOfflineSegmentsTaskGenerator();
        realtimeToOfflineSegmentsTaskGenerator2.init(clusterInfoAccessor);
        List generateTasks2 = realtimeToOfflineSegmentsTaskGenerator2.generateTasks(Lists.newArrayList(new TableConfig[]{realtimeTableConfig}));
        Assert.assertEquals(generateTasks2.size(), 1);
        Assert.assertEquals(((PinotTaskConfig) generateTasks2.get(0)).getTaskType(), "RealtimeToOfflineSegmentsTask");
        Map configs2 = ((PinotTaskConfig) generateTasks2.get(0)).getConfigs();
        Assert.assertEquals((String) configs2.get("tableName"), REALTIME_TABLE_NAME);
        Assert.assertEquals((String) configs2.get("segmentName"), "testTable__0__0__12345,testTable__1__0__12345");
        Assert.assertEquals((String) configs2.get("downloadURL"), "download1,download2");
        Assert.assertEquals((String) configs2.get("windowStartMs"), "1590019200000");
        Assert.assertEquals((String) configs2.get("windowEndMs"), "1590105600000");
    }

    @Test
    public void testGenerateTasksWithMinionMetadata() {
        ClusterInfoAccessor clusterInfoAccessor = (ClusterInfoAccessor) Mockito.mock(ClusterInfoAccessor.class);
        Mockito.when(clusterInfoAccessor.getTaskStates("RealtimeToOfflineSegmentsTask")).thenReturn(new HashMap());
        Mockito.when(clusterInfoAccessor.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME)).thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590019200000L));
        Mockito.when(clusterInfoAccessor.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList(new LLCRealtimeSegmentZKMetadata[]{getRealtimeSegmentZKMetadata("testTable__0__0__12345", CommonConstants.Segment.Realtime.Status.DONE, 1589972400000L, 1590048000000L, TimeUnit.MILLISECONDS, "download1"), getRealtimeSegmentZKMetadata("testTable__0__1__12345", CommonConstants.Segment.Realtime.Status.DONE, 1590048000000L, 1590134400000L, TimeUnit.MILLISECONDS, "download2")}));
        HashMap hashMap = new HashMap();
        hashMap.put("RealtimeToOfflineSegmentsTask", new HashMap());
        TableConfig realtimeTableConfig = getRealtimeTableConfig(hashMap);
        RealtimeToOfflineSegmentsTaskGenerator realtimeToOfflineSegmentsTaskGenerator = new RealtimeToOfflineSegmentsTaskGenerator();
        realtimeToOfflineSegmentsTaskGenerator.init(clusterInfoAccessor);
        List generateTasks = realtimeToOfflineSegmentsTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{realtimeTableConfig}));
        Assert.assertEquals(generateTasks.size(), 1);
        Assert.assertEquals(((PinotTaskConfig) generateTasks.get(0)).getTaskType(), "RealtimeToOfflineSegmentsTask");
        Map configs = ((PinotTaskConfig) generateTasks.get(0)).getConfigs();
        Assert.assertEquals((String) configs.get("tableName"), REALTIME_TABLE_NAME);
        Assert.assertEquals((String) configs.get("segmentName"), "testTable__0__0__12345,testTable__0__1__12345");
        Assert.assertEquals((String) configs.get("downloadURL"), "download1,download2");
        Assert.assertEquals((String) configs.get("windowStartMs"), "1590019200000");
        Assert.assertEquals((String) configs.get("windowEndMs"), "1590105600000");
        Mockito.when(clusterInfoAccessor.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME)).thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590490800000L));
        RealtimeToOfflineSegmentsTaskGenerator realtimeToOfflineSegmentsTaskGenerator2 = new RealtimeToOfflineSegmentsTaskGenerator();
        realtimeToOfflineSegmentsTaskGenerator2.init(clusterInfoAccessor);
        Assert.assertEquals(realtimeToOfflineSegmentsTaskGenerator2.generateTasks(Lists.newArrayList(new TableConfig[]{realtimeTableConfig})).size(), 0);
        Mockito.when(clusterInfoAccessor.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME)).thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590019200000L));
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        hashMap3.put("bucketTimePeriod", "2h");
        hashMap2.put("RealtimeToOfflineSegmentsTask", hashMap3);
        List generateTasks2 = realtimeToOfflineSegmentsTaskGenerator2.generateTasks(Lists.newArrayList(new TableConfig[]{getRealtimeTableConfig(hashMap2)}));
        Assert.assertEquals(generateTasks2.size(), 1);
        Assert.assertEquals(((PinotTaskConfig) generateTasks2.get(0)).getTaskType(), "RealtimeToOfflineSegmentsTask");
        Map configs2 = ((PinotTaskConfig) generateTasks2.get(0)).getConfigs();
        Assert.assertEquals((String) configs2.get("tableName"), REALTIME_TABLE_NAME);
        Assert.assertEquals((String) configs2.get("segmentName"), "testTable__0__0__12345");
        Assert.assertEquals((String) configs2.get("downloadURL"), "download1");
        Assert.assertEquals((String) configs2.get("windowStartMs"), "1590019200000");
        Assert.assertEquals((String) configs2.get("windowEndMs"), "1590026400000");
        HashMap hashMap4 = new HashMap();
        HashMap hashMap5 = new HashMap();
        hashMap5.put("timeColumnTransformFunction", "foo");
        hashMap5.put("collectorType", "rollup");
        hashMap5.put("m1.aggregationType", "MAX");
        hashMap4.put("RealtimeToOfflineSegmentsTask", hashMap5);
        TableConfig realtimeTableConfig2 = getRealtimeTableConfig(hashMap4);
        RealtimeToOfflineSegmentsTaskGenerator realtimeToOfflineSegmentsTaskGenerator3 = new RealtimeToOfflineSegmentsTaskGenerator();
        realtimeToOfflineSegmentsTaskGenerator3.init(clusterInfoAccessor);
        List generateTasks3 = realtimeToOfflineSegmentsTaskGenerator3.generateTasks(Lists.newArrayList(new TableConfig[]{realtimeTableConfig2}));
        Assert.assertEquals(generateTasks3.size(), 1);
        Assert.assertEquals(((PinotTaskConfig) generateTasks3.get(0)).getTaskType(), "RealtimeToOfflineSegmentsTask");
        Map configs3 = ((PinotTaskConfig) generateTasks3.get(0)).getConfigs();
        Assert.assertEquals((String) configs3.get("tableName"), REALTIME_TABLE_NAME);
        Assert.assertEquals((String) configs3.get("segmentName"), "testTable__0__0__12345,testTable__0__1__12345");
        Assert.assertEquals((String) configs3.get("downloadURL"), "download1,download2");
        Assert.assertEquals((String) configs3.get("windowStartMs"), "1590019200000");
        Assert.assertEquals((String) configs3.get("windowEndMs"), "1590105600000");
        Assert.assertEquals((String) configs3.get("timeColumnTransformFunction"), "foo");
        Assert.assertEquals((String) configs3.get("collectorType"), "rollup");
        Assert.assertEquals((String) configs3.get("m1.aggregationType"), "MAX");
    }

    @Test
    public void testOverflowIntoConsuming() {
        HashMap hashMap = new HashMap();
        hashMap.put("RealtimeToOfflineSegmentsTask", new HashMap());
        TableConfig realtimeTableConfig = getRealtimeTableConfig(hashMap);
        ClusterInfoAccessor clusterInfoAccessor = (ClusterInfoAccessor) Mockito.mock(ClusterInfoAccessor.class);
        Mockito.when(clusterInfoAccessor.getTaskStates("RealtimeToOfflineSegmentsTask")).thenReturn(new HashMap());
        Mockito.when(clusterInfoAccessor.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME)).thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 100000L));
        Mockito.when(clusterInfoAccessor.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList(new LLCRealtimeSegmentZKMetadata[]{getRealtimeSegmentZKMetadata("testTable__0__0__12345", CommonConstants.Segment.Realtime.Status.DONE, 50000L, 150000L, TimeUnit.MILLISECONDS, null), getRealtimeSegmentZKMetadata("testTable__0__1__12345", CommonConstants.Segment.Realtime.Status.IN_PROGRESS, -1L, -1L, TimeUnit.MILLISECONDS, null)}));
        RealtimeToOfflineSegmentsTaskGenerator realtimeToOfflineSegmentsTaskGenerator = new RealtimeToOfflineSegmentsTaskGenerator();
        realtimeToOfflineSegmentsTaskGenerator.init(clusterInfoAccessor);
        Assert.assertTrue(realtimeToOfflineSegmentsTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{realtimeTableConfig})).isEmpty());
        Mockito.when(clusterInfoAccessor.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList(new LLCRealtimeSegmentZKMetadata[]{getRealtimeSegmentZKMetadata("testTable__0__0__12345", CommonConstants.Segment.Realtime.Status.DONE, 100000L, 200000L, TimeUnit.MILLISECONDS, null), getRealtimeSegmentZKMetadata("testTable__0__1__12345", CommonConstants.Segment.Realtime.Status.IN_PROGRESS, -1L, -1L, TimeUnit.MILLISECONDS, null)}));
        Assert.assertTrue(realtimeToOfflineSegmentsTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{realtimeTableConfig})).isEmpty());
        Mockito.when(clusterInfoAccessor.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList(new LLCRealtimeSegmentZKMetadata[]{getRealtimeSegmentZKMetadata("testTable__0__0__12345", CommonConstants.Segment.Realtime.Status.DONE, 200000L, 86500000L, TimeUnit.MILLISECONDS, null), getRealtimeSegmentZKMetadata("testTable__0__1__12345", CommonConstants.Segment.Realtime.Status.IN_PROGRESS, -1L, -1L, TimeUnit.MILLISECONDS, null)}));
        Assert.assertEquals(realtimeToOfflineSegmentsTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{realtimeTableConfig})).size(), 1);
    }

    @Test
    public void testBuffer() {
        HashMap hashMap = new HashMap();
        hashMap.put("RealtimeToOfflineSegmentsTask", new HashMap());
        TableConfig realtimeTableConfig = getRealtimeTableConfig(hashMap);
        long currentTimeMillis = System.currentTimeMillis();
        long millis = currentTimeMillis - TimeUnit.DAYS.toMillis(1L);
        ClusterInfoAccessor clusterInfoAccessor = (ClusterInfoAccessor) Mockito.mock(ClusterInfoAccessor.class);
        Mockito.when(clusterInfoAccessor.getTaskStates("RealtimeToOfflineSegmentsTask")).thenReturn(new HashMap());
        Mockito.when(clusterInfoAccessor.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME)).thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, millis));
        Mockito.when(clusterInfoAccessor.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList(new LLCRealtimeSegmentZKMetadata[]{getRealtimeSegmentZKMetadata("testTable__0__0__12345", CommonConstants.Segment.Realtime.Status.DONE, millis - 100, millis + 100, TimeUnit.MILLISECONDS, null)}));
        RealtimeToOfflineSegmentsTaskGenerator realtimeToOfflineSegmentsTaskGenerator = new RealtimeToOfflineSegmentsTaskGenerator();
        realtimeToOfflineSegmentsTaskGenerator.init(clusterInfoAccessor);
        Assert.assertTrue(realtimeToOfflineSegmentsTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{realtimeTableConfig})).isEmpty());
        HashMap hashMap2 = new HashMap();
        hashMap2.put("bufferTimePeriod", "15d");
        hashMap.put("RealtimeToOfflineSegmentsTask", hashMap2);
        TableConfig realtimeTableConfig2 = getRealtimeTableConfig(hashMap);
        long millis2 = currentTimeMillis - TimeUnit.DAYS.toMillis(10L);
        Mockito.when(clusterInfoAccessor.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME)).thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, millis2));
        Mockito.when(clusterInfoAccessor.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList(new LLCRealtimeSegmentZKMetadata[]{getRealtimeSegmentZKMetadata("testTable__0__0__12345", CommonConstants.Segment.Realtime.Status.DONE, millis2 - 100, millis2 + 100, TimeUnit.MILLISECONDS, null)}));
        Assert.assertTrue(realtimeToOfflineSegmentsTaskGenerator.generateTasks(Lists.newArrayList(new TableConfig[]{realtimeTableConfig2})).isEmpty());
    }

    private LLCRealtimeSegmentZKMetadata getRealtimeSegmentZKMetadata(String str, CommonConstants.Segment.Realtime.Status status, long j, long j2, TimeUnit timeUnit, String str2) {
        LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata = new LLCRealtimeSegmentZKMetadata();
        lLCRealtimeSegmentZKMetadata.setSegmentName(str);
        lLCRealtimeSegmentZKMetadata.setStatus(status);
        lLCRealtimeSegmentZKMetadata.setStartTime(j);
        lLCRealtimeSegmentZKMetadata.setEndTime(j2);
        lLCRealtimeSegmentZKMetadata.setTimeUnit(timeUnit);
        lLCRealtimeSegmentZKMetadata.setDownloadUrl(str2);
        return lLCRealtimeSegmentZKMetadata;
    }
}
