package org.apache.druid.indexing.seekablestream;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.class */
public class SeekableStreamSupervisorSpecTest extends EasyMockSupport {
    private SeekableStreamSupervisorIngestionSpec ingestionSchema;
    private TaskStorage taskStorage;
    private TaskMaster taskMaster;
    private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
    private ServiceEmitter emitter;
    private RowIngestionMetersFactory rowIngestionMetersFactory;
    private DataSchema dataSchema;
    private SeekableStreamSupervisorTuningConfig seekableStreamSupervisorTuningConfig;
    private SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig;
    private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
    private SeekableStreamIndexTaskClientFactory taskClientFactory;
    private static final String STREAM = "stream";
    private static final String DATASOURCE = "testDS";
    private SeekableStreamSupervisorSpec spec;
    private SupervisorStateManagerConfig supervisorConfig;
    private SeekableStreamSupervisor supervisor4;
    private SeekableStreamIndexTaskClientFactory indexTaskClientFactory;
    private ObjectMapper mapper;
    private DruidMonitorSchedulerConfig monitorSchedulerConfig;
    private SupervisorStateManagerConfig supervisorStateManagerConfig;

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest$BaseTestSeekableStreamSupervisor.class */
    private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor<String, String, ByteEntity> {
        private BaseTestSeekableStreamSupervisor() {
            super("testSupervisorId", SeekableStreamSupervisorSpecTest.this.taskStorage, SeekableStreamSupervisorSpecTest.this.taskMaster, SeekableStreamSupervisorSpecTest.this.indexerMetadataStorageCoordinator, SeekableStreamSupervisorSpecTest.this.taskClientFactory, SeekableStreamSupervisorSpecTest.OBJECT_MAPPER, SeekableStreamSupervisorSpecTest.this.spec, SeekableStreamSupervisorSpecTest.this.rowIngestionMetersFactory, false);
        }

        protected String baseTaskName() {
            return "test";
        }

        protected void updatePartitionLagFromStream() {
        }

        @Nullable
        protected Map<String, Long> getPartitionRecordLag() {
            return null;
        }

        @Nullable
        protected Map<String, Long> getPartitionTimeLag() {
            return null;
        }

        protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(int i, Map<String, String> map, Map<String, String> map2, String str, DateTime dateTime, DateTime dateTime2, Set<String> set, SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig) {
            return new SeekableStreamIndexTaskIOConfig<String, String>(Integer.valueOf(i), str, new SeekableStreamStartSequenceNumbers(SeekableStreamSupervisorSpecTest.STREAM, map, set), new SeekableStreamEndSequenceNumbers(SeekableStreamSupervisorSpecTest.STREAM, map2), true, dateTime, dateTime2, seekableStreamSupervisorIOConfig.getInputFormat()) { // from class: org.apache.druid.indexing.seekablestream.SeekableStreamSupervisorSpecTest.BaseTestSeekableStreamSupervisor.1
            };
        }

        protected List<SeekableStreamIndexTask<String, String, ByteEntity>> createIndexTasks(int i, String str, ObjectMapper objectMapper, TreeMap<Integer, Map<String, String>> treeMap, SeekableStreamIndexTaskIOConfig seekableStreamIndexTaskIOConfig, SeekableStreamIndexTaskTuningConfig seekableStreamIndexTaskTuningConfig, RowIngestionMetersFactory rowIngestionMetersFactory) {
            return null;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public int getTaskGroupIdForPartition(String str) {
            return 0;
        }

        protected boolean checkSourceMetadataMatch(DataSourceMetadata dataSourceMetadata) {
            return true;
        }

        protected boolean doesTaskTypeMatchSupervisor(Task task) {
            return true;
        }

        protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(String str, Map<String, String> map) {
            return null;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public OrderedSequenceNumber<String> makeSequenceNumber(String str, boolean z) {
            return new OrderedSequenceNumber<String>(str, z) { // from class: org.apache.druid.indexing.seekablestream.SeekableStreamSupervisorSpecTest.BaseTestSeekableStreamSupervisor.2
                public int compareTo(OrderedSequenceNumber<String> orderedSequenceNumber) {
                    return new BigInteger((String) get()).compareTo(new BigInteger((String) orderedSequenceNumber.get()));
                }
            };
        }

        protected Map<String, Long> getRecordLagPerPartition(Map<String, String> map) {
            return null;
        }

        protected Map<String, Long> getTimeLagPerPartition(Map<String, String> map) {
            return null;
        }

        protected RecordSupplier<String, String, ByteEntity> setupRecordSupplier() {
            return this.recordSupplier;
        }

        protected SeekableStreamSupervisorReportPayload<String, String> createReportPayload(int i, boolean z) {
            return new SeekableStreamSupervisorReportPayload<String, String>(SeekableStreamSupervisorSpecTest.DATASOURCE, SeekableStreamSupervisorSpecTest.STREAM, 1, 1, 1L, null, null, null, null, null, null, false, true, null, null, null) { // from class: org.apache.druid.indexing.seekablestream.SeekableStreamSupervisorSpecTest.BaseTestSeekableStreamSupervisor.3
            };
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: getNotSetMarker, reason: merged with bridge method [inline-methods] */
        public String m131getNotSetMarker() {
            return "NOT_SET";
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: getEndOfPartitionMarker, reason: merged with bridge method [inline-methods] */
        public String m130getEndOfPartitionMarker() {
            return "EOF";
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public boolean isEndOfShard(String str) {
            return false;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public boolean isShardExpirationMarker(String str) {
            return false;
        }

        protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() {
            return false;
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest$TestSeekableStreamSupervisor.class */
    private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor {
        private int partitionNumbers;

        public TestSeekableStreamSupervisor(int i) {
            super();
            this.partitionNumbers = i;
        }

        protected void scheduleReporting(ScheduledExecutorService scheduledExecutorService) {
        }

        public LagStats computeLagStats() {
            return new LagStats(0L, 0L, 0L);
        }

        public int getPartitionCount() {
            return this.partitionNumbers;
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest$TestSeekableStreamSupervisorSpec.class */
    private static class TestSeekableStreamSupervisorSpec extends SeekableStreamSupervisorSpec {
        private SeekableStreamSupervisor supervisor;
        private String id;

        public TestSeekableStreamSupervisorSpec(SeekableStreamSupervisorIngestionSpec seekableStreamSupervisorIngestionSpec, @Nullable Map<String, Object> map, Boolean bool, TaskStorage taskStorage, TaskMaster taskMaster, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, SeekableStreamIndexTaskClientFactory seekableStreamIndexTaskClientFactory, ObjectMapper objectMapper, ServiceEmitter serviceEmitter, DruidMonitorSchedulerConfig druidMonitorSchedulerConfig, RowIngestionMetersFactory rowIngestionMetersFactory, SupervisorStateManagerConfig supervisorStateManagerConfig, SeekableStreamSupervisor seekableStreamSupervisor, String str) {
            super(seekableStreamSupervisorIngestionSpec, map, bool, taskStorage, taskMaster, indexerMetadataStorageCoordinator, seekableStreamIndexTaskClientFactory, objectMapper, serviceEmitter, druidMonitorSchedulerConfig, rowIngestionMetersFactory, supervisorStateManagerConfig);
            this.supervisor = seekableStreamSupervisor;
            this.id = str;
        }

        public List<String> getDataSources() {
            return new ArrayList();
        }

        public String getId() {
            return this.id;
        }

        public Supervisor createSupervisor() {
            return this.supervisor;
        }

        public String getType() {
            return null;
        }

        public String getSource() {
            return null;
        }

        protected SeekableStreamSupervisorSpec toggleSuspend(boolean z) {
            return null;
        }
    }

    @Before
    public void setUp() {
        this.ingestionSchema = (SeekableStreamSupervisorIngestionSpec) EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class);
        this.taskStorage = (TaskStorage) EasyMock.mock(TaskStorage.class);
        this.taskMaster = (TaskMaster) EasyMock.mock(TaskMaster.class);
        this.indexerMetadataStorageCoordinator = (IndexerMetadataStorageCoordinator) EasyMock.mock(IndexerMetadataStorageCoordinator.class);
        this.emitter = (ServiceEmitter) EasyMock.mock(ServiceEmitter.class);
        this.rowIngestionMetersFactory = (RowIngestionMetersFactory) EasyMock.mock(RowIngestionMetersFactory.class);
        this.dataSchema = (DataSchema) EasyMock.mock(DataSchema.class);
        this.seekableStreamSupervisorTuningConfig = (SeekableStreamSupervisorTuningConfig) EasyMock.mock(SeekableStreamSupervisorTuningConfig.class);
        this.seekableStreamSupervisorIOConfig = (SeekableStreamSupervisorIOConfig) EasyMock.mock(SeekableStreamSupervisorIOConfig.class);
        this.taskClientFactory = (SeekableStreamIndexTaskClientFactory) EasyMock.mock(SeekableStreamIndexTaskClientFactory.class);
        this.spec = (SeekableStreamSupervisorSpec) EasyMock.mock(SeekableStreamSupervisorSpec.class);
        this.supervisorConfig = new SupervisorStateManagerConfig();
        this.indexTaskClientFactory = (SeekableStreamIndexTaskClientFactory) EasyMock.mock(SeekableStreamIndexTaskClientFactory.class);
        this.mapper = new DefaultObjectMapper();
        this.monitorSchedulerConfig = (DruidMonitorSchedulerConfig) EasyMock.mock(DruidMonitorSchedulerConfig.class);
        this.supervisorStateManagerConfig = (SupervisorStateManagerConfig) EasyMock.mock(SupervisorStateManagerConfig.class);
        this.supervisor4 = (SeekableStreamSupervisor) EasyMock.mock(SeekableStreamSupervisor.class);
    }

    private static SeekableStreamSupervisorTuningConfig getTuningConfig() {
        return new SeekableStreamSupervisorTuningConfig() { // from class: org.apache.druid.indexing.seekablestream.SeekableStreamSupervisorSpecTest.1
            public Integer getWorkerThreads() {
                return 1;
            }

            public boolean getChatAsync() {
                return false;
            }

            public Integer getChatThreads() {
                return 1;
            }

            public Long getChatRetries() {
                return 1L;
            }

            public Duration getHttpTimeout() {
                return new Period("PT1M").toStandardDuration();
            }

            public Duration getShutdownTimeout() {
                return new Period("PT1S").toStandardDuration();
            }

            public Duration getRepartitionTransitionDuration() {
                return new Period("PT2M").toStandardDuration();
            }

            public Duration getOffsetFetchPeriod() {
                return new Period("PT5M").toStandardDuration();
            }

            public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() {
                return new SeekableStreamIndexTaskTuningConfig(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) { // from class: org.apache.druid.indexing.seekablestream.SeekableStreamSupervisorSpecTest.1.1
                    /* renamed from: withBasePersistDirectory, reason: merged with bridge method [inline-methods] */
                    public SeekableStreamIndexTaskTuningConfig m129withBasePersistDirectory(File file) {
                        return null;
                    }

                    public String toString() {
                        return null;
                    }
                };
            }
        };
    }

    @Test
    public void testAutoScalerConfig() {
        AutoScalerConfig autoScalerConfig = (AutoScalerConfig) this.mapper.convertValue(new HashMap(), AutoScalerConfig.class);
        Assert.assertTrue(autoScalerConfig instanceof LagBasedAutoScalerConfig);
        Assert.assertFalse(autoScalerConfig.getEnableTaskAutoScaler());
        Assert.assertNull((AutoScalerConfig) this.mapper.convertValue((Object) null, AutoScalerConfig.class));
        Assert.assertTrue(((AutoScalerConfig) this.mapper.convertValue(ImmutableMap.of("autoScalerStrategy", "lagBased"), AutoScalerConfig.class)) instanceof LagBasedAutoScalerConfig);
        LagBasedAutoScalerConfig lagBasedAutoScalerConfig = (AutoScalerConfig) this.mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis", "1"), AutoScalerConfig.class);
        Assert.assertTrue(lagBasedAutoScalerConfig instanceof LagBasedAutoScalerConfig);
        Assert.assertEquals(lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 1L);
        RuntimeException runtimeException = null;
        try {
        } catch (RuntimeException e) {
            runtimeException = e;
        }
        Assert.assertNotNull(runtimeException);
        RuntimeException runtimeException2 = null;
        try {
        } catch (RuntimeException e2) {
            runtimeException2 = e2;
        }
        Assert.assertNotNull(runtimeException2);
    }

    @Test
    public void testAutoScalerCreated() {
        HashMap hashMap = new HashMap();
        hashMap.put("enableTaskAutoScaler", true);
        hashMap.put("lagCollectionIntervalMillis", 500);
        hashMap.put("lagCollectionRangeMillis", 500);
        hashMap.put("scaleOutThreshold", 5000000);
        hashMap.put("triggerScaleOutFractionThreshold", Double.valueOf(0.3d));
        hashMap.put("scaleInThreshold", 1000000);
        hashMap.put("triggerScaleInFractionThreshold", Double.valueOf(0.8d));
        hashMap.put("scaleActionStartDelayMillis", 0);
        hashMap.put("scaleActionPeriodMillis", 100);
        hashMap.put("taskCountMax", 8);
        hashMap.put("taskCountMin", 1);
        hashMap.put("scaleInStep", 1);
        hashMap.put("scaleOutStep", 2);
        hashMap.put("minTriggerScaleActionFrequencyMillis", 1200000);
        EasyMock.expect(this.ingestionSchema.getIOConfig()).andReturn(this.seekableStreamSupervisorIOConfig).anyTimes();
        EasyMock.expect(this.ingestionSchema.getDataSchema()).andReturn(this.dataSchema).anyTimes();
        EasyMock.expect(this.ingestionSchema.getTuningConfig()).andReturn(this.seekableStreamSupervisorTuningConfig).anyTimes();
        EasyMock.replay(new Object[]{this.ingestionSchema});
        EasyMock.expect(this.seekableStreamSupervisorIOConfig.getAutoScalerConfig()).andReturn(this.mapper.convertValue(hashMap, AutoScalerConfig.class)).anyTimes();
        EasyMock.replay(new Object[]{this.seekableStreamSupervisorIOConfig});
        EasyMock.expect(Integer.valueOf(this.supervisor4.getActiveTaskGroupsCount())).andReturn(0).anyTimes();
        EasyMock.replay(new Object[]{this.supervisor4});
        TestSeekableStreamSupervisorSpec testSeekableStreamSupervisorSpec = new TestSeekableStreamSupervisorSpec(this.ingestionSchema, null, false, this.taskStorage, this.taskMaster, this.indexerMetadataStorageCoordinator, this.indexTaskClientFactory, this.mapper, this.emitter, this.monitorSchedulerConfig, this.rowIngestionMetersFactory, this.supervisorStateManagerConfig, this.supervisor4, "id1");
        Assert.assertTrue(testSeekableStreamSupervisorSpec.createAutoscaler(this.supervisor4) instanceof LagBasedAutoScaler);
        EasyMock.reset(new Object[]{this.seekableStreamSupervisorIOConfig});
        hashMap.put("enableTaskAutoScaler", false);
        EasyMock.expect(this.seekableStreamSupervisorIOConfig.getAutoScalerConfig()).andReturn(this.mapper.convertValue(hashMap, AutoScalerConfig.class)).anyTimes();
        EasyMock.replay(new Object[]{this.seekableStreamSupervisorIOConfig});
        Assert.assertTrue(testSeekableStreamSupervisorSpec.createAutoscaler(this.supervisor4) instanceof NoopTaskAutoScaler);
        EasyMock.reset(new Object[]{this.seekableStreamSupervisorIOConfig});
        hashMap.remove("enableTaskAutoScaler");
        EasyMock.expect(this.seekableStreamSupervisorIOConfig.getAutoScalerConfig()).andReturn(this.mapper.convertValue(hashMap, AutoScalerConfig.class)).anyTimes();
        EasyMock.replay(new Object[]{this.seekableStreamSupervisorIOConfig});
        Assert.assertTrue(testSeekableStreamSupervisorSpec.createAutoscaler(this.supervisor4) instanceof NoopTaskAutoScaler);
        EasyMock.reset(new Object[]{this.seekableStreamSupervisorIOConfig});
        hashMap.clear();
        EasyMock.expect(this.seekableStreamSupervisorIOConfig.getAutoScalerConfig()).andReturn(this.mapper.convertValue(hashMap, AutoScalerConfig.class)).anyTimes();
        EasyMock.replay(new Object[]{this.seekableStreamSupervisorIOConfig});
        Assert.assertTrue(hashMap.isEmpty());
        Assert.assertTrue(testSeekableStreamSupervisorSpec.createAutoscaler(this.supervisor4) instanceof NoopTaskAutoScaler);
    }

    @Test
    public void testDefaultAutoScalerConfigCreatedWithDefault() {
        EasyMock.expect(this.ingestionSchema.getIOConfig()).andReturn(this.seekableStreamSupervisorIOConfig).anyTimes();
        EasyMock.expect(this.ingestionSchema.getDataSchema()).andReturn(this.dataSchema).anyTimes();
        EasyMock.expect(this.ingestionSchema.getTuningConfig()).andReturn(this.seekableStreamSupervisorTuningConfig).anyTimes();
        EasyMock.replay(new Object[]{this.ingestionSchema});
        EasyMock.expect(this.seekableStreamSupervisorIOConfig.getAutoScalerConfig()).andReturn(this.mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis", "1", "enableTaskAutoScaler", true, "taskCountMax", "4", "taskCountMin", "1"), AutoScalerConfig.class)).anyTimes();
        EasyMock.replay(new Object[]{this.seekableStreamSupervisorIOConfig});
        EasyMock.expect(Integer.valueOf(this.supervisor4.getActiveTaskGroupsCount())).andReturn(0).anyTimes();
        EasyMock.replay(new Object[]{this.supervisor4});
        LagBasedAutoScaler createAutoscaler = new TestSeekableStreamSupervisorSpec(this.ingestionSchema, null, false, this.taskStorage, this.taskMaster, this.indexerMetadataStorageCoordinator, this.indexTaskClientFactory, this.mapper, this.emitter, this.monitorSchedulerConfig, this.rowIngestionMetersFactory, this.supervisorStateManagerConfig, this.supervisor4, "id1").createAutoscaler(this.supervisor4);
        Assert.assertTrue(createAutoscaler instanceof LagBasedAutoScaler);
        LagBasedAutoScalerConfig autoScalerConfig = createAutoscaler.getAutoScalerConfig();
        Assert.assertEquals(autoScalerConfig.getLagCollectionIntervalMillis(), 1L);
        Assert.assertEquals(autoScalerConfig.getLagCollectionRangeMillis(), 600000L);
        Assert.assertEquals(autoScalerConfig.getScaleActionStartDelayMillis(), 300000L);
        Assert.assertEquals(autoScalerConfig.getScaleActionPeriodMillis(), 60000L);
        Assert.assertEquals(autoScalerConfig.getScaleOutThreshold(), 6000000L);
        Assert.assertEquals(autoScalerConfig.getScaleInThreshold(), 1000000L);
        Assert.assertEquals(autoScalerConfig.getTaskCountMax(), 4L);
        Assert.assertEquals(autoScalerConfig.getTaskCountMin(), 1L);
        Assert.assertEquals(autoScalerConfig.getScaleInStep(), 1L);
        Assert.assertEquals(autoScalerConfig.getScaleOutStep(), 2L);
        Assert.assertEquals(autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), 600000L);
    }

    @Test
    public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedException {
        EasyMock.expect(this.spec.getSupervisorStateManagerConfig()).andReturn(this.supervisorConfig).anyTimes();
        EasyMock.expect(this.spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
        EasyMock.expect(this.spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes();
        EasyMock.expect(this.spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
        EasyMock.expect(this.spec.getEmitter()).andReturn(this.emitter).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.replay(new Object[]{this.spec});
        EasyMock.expect(this.ingestionSchema.getIOConfig()).andReturn(this.seekableStreamSupervisorIOConfig).anyTimes();
        EasyMock.expect(this.ingestionSchema.getDataSchema()).andReturn(this.dataSchema).anyTimes();
        EasyMock.expect(this.ingestionSchema.getTuningConfig()).andReturn(this.seekableStreamSupervisorTuningConfig).anyTimes();
        EasyMock.replay(new Object[]{this.ingestionSchema});
        EasyMock.expect(this.taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
        EasyMock.expect(this.taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
        EasyMock.replay(new Object[]{this.taskMaster});
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor(3);
        LagBasedAutoScaler lagBasedAutoScaler = new LagBasedAutoScaler(testSeekableStreamSupervisor, DATASOURCE, (LagBasedAutoScalerConfig) this.mapper.convertValue(getScaleOutProperties(2), LagBasedAutoScalerConfig.class), this.spec);
        testSeekableStreamSupervisor.start();
        lagBasedAutoScaler.start();
        testSeekableStreamSupervisor.runInternal();
        Assert.assertEquals(1L, testSeekableStreamSupervisor.getIoConfig().getTaskCount().intValue());
        Thread.sleep(1000L);
        Assert.assertEquals(2L, testSeekableStreamSupervisor.getIoConfig().getTaskCount().intValue());
        lagBasedAutoScaler.reset();
        lagBasedAutoScaler.stop();
    }

    @Test
    public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() throws InterruptedException {
        EasyMock.expect(this.spec.getSupervisorStateManagerConfig()).andReturn(this.supervisorConfig).anyTimes();
        EasyMock.expect(this.spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
        EasyMock.expect(this.spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes();
        EasyMock.expect(this.spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
        EasyMock.expect(this.spec.getEmitter()).andReturn(this.emitter).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.replay(new Object[]{this.spec});
        EasyMock.expect(this.ingestionSchema.getIOConfig()).andReturn(this.seekableStreamSupervisorIOConfig).anyTimes();
        EasyMock.expect(this.ingestionSchema.getDataSchema()).andReturn(this.dataSchema).anyTimes();
        EasyMock.expect(this.ingestionSchema.getTuningConfig()).andReturn(this.seekableStreamSupervisorTuningConfig).anyTimes();
        EasyMock.replay(new Object[]{this.ingestionSchema});
        EasyMock.expect(this.taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
        EasyMock.expect(this.taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
        EasyMock.replay(new Object[]{this.taskMaster});
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor(2);
        LagBasedAutoScaler lagBasedAutoScaler = new LagBasedAutoScaler(testSeekableStreamSupervisor, DATASOURCE, (LagBasedAutoScalerConfig) this.mapper.convertValue(getScaleOutProperties(3), LagBasedAutoScalerConfig.class), this.spec);
        testSeekableStreamSupervisor.start();
        lagBasedAutoScaler.start();
        testSeekableStreamSupervisor.runInternal();
        Assert.assertEquals(1L, testSeekableStreamSupervisor.getIoConfig().getTaskCount().intValue());
        Thread.sleep(1000L);
        Assert.assertEquals(2L, testSeekableStreamSupervisor.getIoConfig().getTaskCount().intValue());
        lagBasedAutoScaler.reset();
        lagBasedAutoScaler.stop();
    }

    @Test
    public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedException {
        EasyMock.expect(this.spec.getSupervisorStateManagerConfig()).andReturn(this.supervisorConfig).anyTimes();
        EasyMock.expect(this.spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
        EasyMock.expect(this.spec.getIoConfig()).andReturn(getIOConfig(2, false)).anyTimes();
        EasyMock.expect(this.spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
        EasyMock.expect(this.spec.getEmitter()).andReturn(this.emitter).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.replay(new Object[]{this.spec});
        EasyMock.expect(this.ingestionSchema.getIOConfig()).andReturn(this.seekableStreamSupervisorIOConfig).anyTimes();
        EasyMock.expect(this.ingestionSchema.getDataSchema()).andReturn(this.dataSchema).anyTimes();
        EasyMock.expect(this.ingestionSchema.getTuningConfig()).andReturn(this.seekableStreamSupervisorTuningConfig).anyTimes();
        EasyMock.replay(new Object[]{this.ingestionSchema});
        EasyMock.expect(this.taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
        EasyMock.expect(this.taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
        EasyMock.replay(new Object[]{this.taskMaster});
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor(3);
        LagBasedAutoScaler lagBasedAutoScaler = new LagBasedAutoScaler(testSeekableStreamSupervisor, DATASOURCE, (LagBasedAutoScalerConfig) this.mapper.convertValue(getScaleInProperties(), LagBasedAutoScalerConfig.class), this.spec);
        Assert.assertEquals(1L, testSeekableStreamSupervisor.getIoConfig().getTaskCount().intValue());
        testSeekableStreamSupervisor.getIoConfig().setTaskCount(2);
        testSeekableStreamSupervisor.start();
        lagBasedAutoScaler.start();
        testSeekableStreamSupervisor.runInternal();
        Assert.assertEquals(2L, testSeekableStreamSupervisor.getIoConfig().getTaskCount().intValue());
        Thread.sleep(1000L);
        Assert.assertEquals(1L, testSeekableStreamSupervisor.getIoConfig().getTaskCount().intValue());
        lagBasedAutoScaler.reset();
        lagBasedAutoScaler.stop();
    }

    @Test
    public void testSeekableStreamSupervisorSpecWithScaleDisable() throws InterruptedException {
        SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig = new SeekableStreamSupervisorIOConfig(STREAM, new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, 1, new Period("PT1H"), new Period("P1D"), new Period("PT30S"), false, new Period("PT30M"), null, null, null, null, null) { // from class: org.apache.druid.indexing.seekablestream.SeekableStreamSupervisorSpecTest.2
        };
        EasyMock.expect(this.spec.getSupervisorStateManagerConfig()).andReturn(this.supervisorConfig).anyTimes();
        EasyMock.expect(this.spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
        EasyMock.expect(this.spec.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
        EasyMock.expect(this.spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
        EasyMock.expect(this.spec.getEmitter()).andReturn(this.emitter).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.replay(new Object[]{this.spec});
        EasyMock.expect(this.ingestionSchema.getIOConfig()).andReturn(this.seekableStreamSupervisorIOConfig).anyTimes();
        EasyMock.expect(this.ingestionSchema.getDataSchema()).andReturn(this.dataSchema).anyTimes();
        EasyMock.expect(this.ingestionSchema.getTuningConfig()).andReturn(this.seekableStreamSupervisorTuningConfig).anyTimes();
        EasyMock.replay(new Object[]{this.ingestionSchema});
        EasyMock.expect(this.taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
        EasyMock.expect(this.taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
        EasyMock.replay(new Object[]{this.taskMaster});
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor(3);
        NoopTaskAutoScaler noopTaskAutoScaler = new NoopTaskAutoScaler();
        testSeekableStreamSupervisor.start();
        noopTaskAutoScaler.start();
        testSeekableStreamSupervisor.runInternal();
        Assert.assertEquals(1L, testSeekableStreamSupervisor.getIoConfig().getTaskCount().intValue());
        Thread.sleep(1000L);
        Assert.assertEquals(1L, testSeekableStreamSupervisor.getIoConfig().getTaskCount().intValue());
        noopTaskAutoScaler.reset();
        noopTaskAutoScaler.stop();
    }

    @Test
    public void testEnablingIdleBeviourPerSupervisorWithOverlordConfigEnabled() {
        EasyMock.expect(this.ingestionSchema.getIOConfig()).andReturn(new SeekableStreamSupervisorIOConfig(STREAM, new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, 1, new Period("PT1H"), new Period("P1D"), new Period("PT30S"), false, new Period("PT30M"), null, null, null, null, new IdleConfig(true, (Long) null)) { // from class: org.apache.druid.indexing.seekablestream.SeekableStreamSupervisorSpecTest.3
        }).anyTimes();
        EasyMock.expect(this.ingestionSchema.getDataSchema()).andReturn(this.dataSchema).anyTimes();
        EasyMock.replay(new Object[]{this.ingestionSchema});
        this.spec = new SeekableStreamSupervisorSpec(this.ingestionSchema, null, null, null, null, null, null, null, null, null, null, this.supervisorStateManagerConfig) { // from class: org.apache.druid.indexing.seekablestream.SeekableStreamSupervisorSpecTest.4
            public Supervisor createSupervisor() {
                return null;
            }

            protected SeekableStreamSupervisorSpec toggleSuspend(boolean z) {
                return null;
            }

            public String getType() {
                return null;
            }

            public String getSource() {
                return null;
            }
        };
        Assert.assertTrue(((IdleConfig) Objects.requireNonNull(this.spec.getIoConfig().getIdleConfig())).isEnabled());
    }

    @Test
    public void testGetContextVauleWithNullContextShouldReturnNull() {
        mockIngestionSchema();
        Assert.assertNull(new TestSeekableStreamSupervisorSpec(this.ingestionSchema, null, false, this.taskStorage, this.taskMaster, this.indexerMetadataStorageCoordinator, this.indexTaskClientFactory, this.mapper, this.emitter, this.monitorSchedulerConfig, this.rowIngestionMetersFactory, this.supervisorStateManagerConfig, this.supervisor4, "id1").getContextValue("key"));
    }

    @Test
    public void testGetContextVauleForNonExistentKeyShouldReturnNull() {
        mockIngestionSchema();
        Assert.assertNull(new TestSeekableStreamSupervisorSpec(this.ingestionSchema, ImmutableMap.of("key", "value"), false, this.taskStorage, this.taskMaster, this.indexerMetadataStorageCoordinator, this.indexTaskClientFactory, this.mapper, this.emitter, this.monitorSchedulerConfig, this.rowIngestionMetersFactory, this.supervisorStateManagerConfig, this.supervisor4, "id1").getContextValue("key_not_exists"));
    }

    @Test
    public void testGetContextVauleForKeyShouldReturnValue() {
        mockIngestionSchema();
        Assert.assertEquals("value", new TestSeekableStreamSupervisorSpec(this.ingestionSchema, ImmutableMap.of("key", "value"), false, this.taskStorage, this.taskMaster, this.indexerMetadataStorageCoordinator, this.indexTaskClientFactory, this.mapper, this.emitter, this.monitorSchedulerConfig, this.rowIngestionMetersFactory, this.supervisorStateManagerConfig, this.supervisor4, "id1").getContextValue("key"));
    }

    private void mockIngestionSchema() {
        EasyMock.expect(this.ingestionSchema.getIOConfig()).andReturn(this.seekableStreamSupervisorIOConfig).anyTimes();
        EasyMock.expect(this.ingestionSchema.getDataSchema()).andReturn(this.dataSchema).anyTimes();
        EasyMock.expect(this.ingestionSchema.getTuningConfig()).andReturn(this.seekableStreamSupervisorTuningConfig).anyTimes();
        EasyMock.replay(new Object[]{this.ingestionSchema});
    }

    private static DataSchema getDataSchema() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(StringDimensionSchema.create("dim1"));
        arrayList.add(StringDimensionSchema.create("dim2"));
        return new DataSchema(DATASOURCE, new TimestampSpec("timestamp", "iso", (DateTime) null), new DimensionsSpec(arrayList), new AggregatorFactory[]{new CountAggregatorFactory("rows")}, new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, ImmutableList.of()), (TransformSpec) null);
    }

    private SeekableStreamSupervisorIOConfig getIOConfig(int i, boolean z) {
        return z ? new SeekableStreamSupervisorIOConfig(STREAM, new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, Integer.valueOf(i), new Period("PT1H"), new Period("P1D"), new Period("PT30S"), false, new Period("PT30M"), null, null, (AutoScalerConfig) this.mapper.convertValue(getScaleOutProperties(2), AutoScalerConfig.class), null, null) { // from class: org.apache.druid.indexing.seekablestream.SeekableStreamSupervisorSpecTest.5
        } : new SeekableStreamSupervisorIOConfig(STREAM, new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, Integer.valueOf(i), new Period("PT1H"), new Period("P1D"), new Period("PT30S"), false, new Period("PT30M"), null, null, (AutoScalerConfig) this.mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class), null, null) { // from class: org.apache.druid.indexing.seekablestream.SeekableStreamSupervisorSpecTest.6
        };
    }

    private static Map<String, Object> getScaleOutProperties(int i) {
        HashMap hashMap = new HashMap();
        hashMap.put("enableTaskAutoScaler", true);
        hashMap.put("lagCollectionIntervalMillis", 500);
        hashMap.put("lagCollectionRangeMillis", 500);
        hashMap.put("scaleOutThreshold", 0);
        hashMap.put("triggerScaleOutFractionThreshold", Double.valueOf(0.0d));
        hashMap.put("scaleInThreshold", 1000000);
        hashMap.put("triggerScaleInFractionThreshold", Double.valueOf(0.8d));
        hashMap.put("scaleActionStartDelayMillis", 0);
        hashMap.put("scaleActionPeriodMillis", 100);
        hashMap.put("taskCountMax", Integer.valueOf(i));
        hashMap.put("taskCountMin", 1);
        hashMap.put("scaleInStep", 1);
        hashMap.put("scaleOutStep", 2);
        hashMap.put("minTriggerScaleActionFrequencyMillis", 1200000);
        return hashMap;
    }

    private static Map<String, Object> getScaleInProperties() {
        HashMap hashMap = new HashMap();
        hashMap.put("enableTaskAutoScaler", true);
        hashMap.put("lagCollectionIntervalMillis", 500);
        hashMap.put("lagCollectionRangeMillis", 500);
        hashMap.put("scaleOutThreshold", 8000000);
        hashMap.put("triggerScaleOutFractionThreshold", Double.valueOf(0.3d));
        hashMap.put("scaleInThreshold", 0);
        hashMap.put("triggerScaleInFractionThreshold", Double.valueOf(0.0d));
        hashMap.put("scaleActionStartDelayMillis", 0);
        hashMap.put("scaleActionPeriodMillis", 100);
        hashMap.put("taskCountMax", 2);
        hashMap.put("taskCountMin", 1);
        hashMap.put("scaleInStep", 1);
        hashMap.put("scaleOutStep", 2);
        hashMap.put("minTriggerScaleActionFrequencyMillis", 1200000);
        return hashMap;
    }
}
