package org.apache.druid.indexing.seekablestream.supervisor;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.hamcrest.MatcherAssert;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.class */
public class SeekableStreamSupervisorStateTest extends EasyMockSupport {
    private static final String DATASOURCE = "testDS";
    private static final String EXCEPTION_MSG = "I had an exception";
    private TaskStorage taskStorage;
    private TaskMaster taskMaster;
    private TaskRunner taskRunner;
    private TaskQueue taskQueue;
    private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
    private SeekableStreamIndexTaskClientFactory taskClientFactory;
    private SeekableStreamSupervisorSpec spec;
    private SeekableStreamIndexTaskClient indexTaskClient;
    private RecordSupplier<String, String, ByteEntity> recordSupplier;
    private RowIngestionMetersFactory rowIngestionMetersFactory;
    private SupervisorStateManagerConfig supervisorConfig;
    private TestEmitter emitter;
    private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
    private static final String STREAM = "stream";
    private static final String SHARD_ID = "0";
    private static final StreamPartition<String> SHARD0_PARTITION = StreamPartition.of(STREAM, SHARD_ID);
    private static final Map<String, Object> METRIC_TAGS = ImmutableMap.of("k1", "v1", "k2", 20);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest$BaseTestSeekableStreamSupervisor.class */
    public abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor<String, String, ByteEntity> {
        private BaseTestSeekableStreamSupervisor() {
            super("testSupervisorId", SeekableStreamSupervisorStateTest.this.taskStorage, SeekableStreamSupervisorStateTest.this.taskMaster, SeekableStreamSupervisorStateTest.this.indexerMetadataStorageCoordinator, SeekableStreamSupervisorStateTest.this.taskClientFactory, SeekableStreamSupervisorStateTest.OBJECT_MAPPER, SeekableStreamSupervisorStateTest.this.spec, SeekableStreamSupervisorStateTest.this.rowIngestionMetersFactory, false);
        }

        protected String baseTaskName() {
            return "test";
        }

        protected void updatePartitionLagFromStream() {
        }

        @Nullable
        protected Map<String, Long> getPartitionRecordLag() {
            return null;
        }

        @Nullable
        protected Map<String, Long> getPartitionTimeLag() {
            return null;
        }

        protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(int i, Map<String, String> map, Map<String, String> map2, String str, DateTime dateTime, DateTime dateTime2, Set<String> set, SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig) {
            return new SeekableStreamIndexTaskIOConfig<String, String>(Integer.valueOf(i), str, new SeekableStreamStartSequenceNumbers(SeekableStreamSupervisorStateTest.STREAM, map, set), new SeekableStreamEndSequenceNumbers(SeekableStreamSupervisorStateTest.STREAM, map2), true, dateTime, dateTime2, seekableStreamSupervisorIOConfig.getInputFormat()) { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.BaseTestSeekableStreamSupervisor.1
            };
        }

        protected List<SeekableStreamIndexTask<String, String, ByteEntity>> createIndexTasks(int i, String str, ObjectMapper objectMapper, TreeMap<Integer, Map<String, String>> treeMap, SeekableStreamIndexTaskIOConfig seekableStreamIndexTaskIOConfig, SeekableStreamIndexTaskTuningConfig seekableStreamIndexTaskTuningConfig, RowIngestionMetersFactory rowIngestionMetersFactory) {
            return ImmutableList.of(new TestSeekableStreamIndexTask("id", null, SeekableStreamSupervisorStateTest.access$1100(), seekableStreamIndexTaskTuningConfig, seekableStreamIndexTaskIOConfig, null, null));
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public int getTaskGroupIdForPartition(String str) {
            try {
                return Integer.parseInt(str) % SeekableStreamSupervisorStateTest.this.spec.getIoConfig().getTaskCount().intValue();
            } catch (NumberFormatException e) {
                return 0;
            }
        }

        protected boolean checkSourceMetadataMatch(DataSourceMetadata dataSourceMetadata) {
            return true;
        }

        protected boolean doesTaskTypeMatchSupervisor(Task task) {
            return true;
        }

        protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(String str, Map<String, String> map) {
            return null;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public OrderedSequenceNumber<String> makeSequenceNumber(String str, boolean z) {
            return new OrderedSequenceNumber<String>(str, z) { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.BaseTestSeekableStreamSupervisor.2
                public int compareTo(OrderedSequenceNumber<String> orderedSequenceNumber) {
                    return new BigInteger((String) get()).compareTo(new BigInteger((String) orderedSequenceNumber.get()));
                }
            };
        }

        protected Map<String, Long> getRecordLagPerPartition(Map<String, String> map) {
            return null;
        }

        protected Map<String, Long> getTimeLagPerPartition(Map<String, String> map) {
            return null;
        }

        protected RecordSupplier<String, String, ByteEntity> setupRecordSupplier() {
            return SeekableStreamSupervisorStateTest.this.recordSupplier;
        }

        protected SeekableStreamSupervisorReportPayload<String, String> createReportPayload(int i, boolean z) {
            return new SeekableStreamSupervisorReportPayload<String, String>(SeekableStreamSupervisorStateTest.DATASOURCE, SeekableStreamSupervisorStateTest.STREAM, 1, 1, 1L, null, null, null, null, null, null, false, true, null, null, null) { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.BaseTestSeekableStreamSupervisor.3
            };
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: getNotSetMarker, reason: merged with bridge method [inline-methods] */
        public String m139getNotSetMarker() {
            return "NOT_SET";
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: getEndOfPartitionMarker, reason: merged with bridge method [inline-methods] */
        public String m138getEndOfPartitionMarker() {
            return "EOF";
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public boolean isEndOfShard(String str) {
            return false;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public boolean isShardExpirationMarker(String str) {
            return false;
        }

        protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest$TestEmitter.class */
    public static class TestEmitter extends NoopServiceEmitter {

        @GuardedBy("events")
        private final List<Event> events;

        private TestEmitter() {
            this.events = new ArrayList();
        }

        public void emit(Event event) {
            synchronized (this.events) {
                this.events.add(event);
            }
        }

        public List<Event> getEvents() {
            ImmutableList copyOf;
            synchronized (this.events) {
                copyOf = ImmutableList.copyOf(this.events);
            }
            return copyOf;
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest$TestEmittingTestSeekableStreamSupervisor.class */
    private class TestEmittingTestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor {
        private final CountDownLatch latch;
        private final Map<String, Long> partitionsRecordLag;
        private final Map<String, Long> partitionsTimeLag;
        private final byte metricFlag;
        private static final byte LAG = 1;
        private static final byte NOTICE_QUEUE = 2;
        private static final byte NOTICE_PROCESS = 4;

        TestEmittingTestSeekableStreamSupervisor(CountDownLatch countDownLatch, byte b, Map<String, Long> map, Map<String, Long> map2) {
            super();
            this.latch = countDownLatch;
            this.metricFlag = b;
            this.partitionsRecordLag = map;
            this.partitionsTimeLag = map2;
        }

        @Override // org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.BaseTestSeekableStreamSupervisor
        @Nullable
        protected Map<String, Long> getPartitionRecordLag() {
            return this.partitionsRecordLag;
        }

        @Override // org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.BaseTestSeekableStreamSupervisor
        @Nullable
        protected Map<String, Long> getPartitionTimeLag() {
            return this.partitionsTimeLag;
        }

        protected void emitLag() {
            if ((this.metricFlag & LAG) == 0) {
                return;
            }
            super.emitLag();
            if (this.stateManager.isSteadyState()) {
                this.latch.countDown();
            }
        }

        protected void emitNoticesQueueSize() {
            if ((this.metricFlag & NOTICE_QUEUE) == 0) {
                return;
            }
            super.emitNoticesQueueSize();
            this.latch.countDown();
        }

        public void emitNoticeProcessTime(String str, long j) {
            if ((this.metricFlag & NOTICE_PROCESS) == 0) {
                return;
            }
            super.emitNoticeProcessTime(str, j);
            this.latch.countDown();
        }

        public LagStats computeLagStats() {
            return null;
        }

        protected void scheduleReporting(ScheduledExecutorService scheduledExecutorService) {
            SeekableStreamSupervisorIOConfig ioConfig = SeekableStreamSupervisorStateTest.this.spec.getIoConfig();
            scheduledExecutorService.scheduleAtFixedRate(this::emitLag, ioConfig.getStartDelay().getMillis(), SeekableStreamSupervisorStateTest.this.spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(), TimeUnit.MILLISECONDS);
            scheduledExecutorService.scheduleAtFixedRate(this::emitNoticesQueueSize, ioConfig.getStartDelay().getMillis(), SeekableStreamSupervisorStateTest.this.spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(), TimeUnit.MILLISECONDS);
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest$TestSeekableStreamDataSourceMetadata.class */
    private static class TestSeekableStreamDataSourceMetadata extends SeekableStreamDataSourceMetadata<String, String> {
        @JsonCreator
        public TestSeekableStreamDataSourceMetadata(@JsonProperty("partitions") SeekableStreamSequenceNumbers<String, String> seekableStreamSequenceNumbers) {
            super(seekableStreamSequenceNumbers);
        }

        public DataSourceMetadata asStartMetadata() {
            SeekableStreamEndSequenceNumbers seekableStreamSequenceNumbers = getSeekableStreamSequenceNumbers();
            return seekableStreamSequenceNumbers instanceof SeekableStreamEndSequenceNumbers ? createConcreteDataSourceMetaData(seekableStreamSequenceNumbers.asStartPartitions(true)) : this;
        }

        protected SeekableStreamDataSourceMetadata<String, String> createConcreteDataSourceMetaData(SeekableStreamSequenceNumbers<String, String> seekableStreamSequenceNumbers) {
            return new TestSeekableStreamDataSourceMetadata(seekableStreamSequenceNumbers);
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest$TestSeekableStreamIndexTask.class */
    private class TestSeekableStreamIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity> {
        public TestSeekableStreamIndexTask(String str, @Nullable TaskResource taskResource, DataSchema dataSchema, SeekableStreamIndexTaskTuningConfig seekableStreamIndexTaskTuningConfig, SeekableStreamIndexTaskIOConfig<String, String> seekableStreamIndexTaskIOConfig, @Nullable Map<String, Object> map, @Nullable String str2) {
            super(str, taskResource, dataSchema, seekableStreamIndexTaskTuningConfig, seekableStreamIndexTaskIOConfig, map, str2);
        }

        protected SeekableStreamIndexTaskRunner<String, String, ByteEntity> createTaskRunner() {
            return null;
        }

        protected RecordSupplier<String, String, ByteEntity> newTaskRecordSupplier(TaskToolbox taskToolbox) {
            return SeekableStreamSupervisorStateTest.this.recordSupplier;
        }

        public String getType() {
            return "test";
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest$TestSeekableStreamSupervisor.class */
    private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor {
        private TestSeekableStreamSupervisor() {
            super();
        }

        protected void scheduleReporting(ScheduledExecutorService scheduledExecutorService) {
        }

        public LagStats computeLagStats() {
            return new LagStats(0L, 0L, 0L);
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest$TestTaskRunnerWorkItem.class */
    private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem {
        private final String taskType;
        private final TaskLocation location;
        private final String dataSource;

        TestTaskRunnerWorkItem(Task task, ListenableFuture<TaskStatus> listenableFuture, TaskLocation taskLocation) {
            super(task.getId(), listenableFuture);
            this.taskType = task.getType();
            this.location = taskLocation;
            this.dataSource = task.getDataSource();
        }

        public TaskLocation getLocation() {
            return this.location;
        }

        public String getTaskType() {
            return this.taskType;
        }

        public String getDataSource() {
            return this.dataSource;
        }
    }

    @Before
    public void setupTest() {
        this.taskStorage = (TaskStorage) createMock(TaskStorage.class);
        this.taskMaster = (TaskMaster) createMock(TaskMaster.class);
        this.taskRunner = (TaskRunner) createMock(TaskRunner.class);
        this.taskQueue = (TaskQueue) createMock(TaskQueue.class);
        this.indexerMetadataStorageCoordinator = (IndexerMetadataStorageCoordinator) createMock(IndexerMetadataStorageCoordinator.class);
        this.taskClientFactory = (SeekableStreamIndexTaskClientFactory) createMock(SeekableStreamIndexTaskClientFactory.class);
        this.spec = (SeekableStreamSupervisorSpec) createMock(SeekableStreamSupervisorSpec.class);
        this.indexTaskClient = (SeekableStreamIndexTaskClient) createMock(SeekableStreamIndexTaskClient.class);
        this.recordSupplier = (RecordSupplier) createMock(RecordSupplier.class);
        this.rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
        this.supervisorConfig = new SupervisorStateManagerConfig();
        this.emitter = new TestEmitter();
        EasyMock.expect(this.spec.getSupervisorStateManagerConfig()).andReturn(this.supervisorConfig).anyTimes();
        EasyMock.expect(this.spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
        EasyMock.expect(this.spec.getIoConfig()).andReturn(getIOConfig()).anyTimes();
        EasyMock.expect(this.spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
        EasyMock.expect(this.spec.getEmitter()).andReturn(this.emitter).anyTimes();
        EasyMock.expect(this.spec.getContextValue("tags")).andReturn(METRIC_TAGS).anyTimes();
        EasyMock.expect(this.taskClientFactory.build(EasyMock.anyString(), (TaskInfoProvider) EasyMock.anyObject(), EasyMock.anyInt(), (SeekableStreamSupervisorTuningConfig) EasyMock.anyObject())).andReturn(this.indexTaskClient).anyTimes();
        EasyMock.expect(this.taskMaster.getTaskRunner()).andReturn(Optional.of(this.taskRunner)).anyTimes();
        EasyMock.expect(this.taskMaster.getTaskQueue()).andReturn(Optional.of(this.taskQueue)).anyTimes();
        this.taskRunner.registerListener((TaskRunnerListener) EasyMock.anyObject(TaskRunnerListener.class), (Executor) EasyMock.anyObject(Executor.class));
        EasyMock.expectLastCall().times(0, 1);
        EasyMock.expect(this.indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn((Object) null).anyTimes();
        EasyMock.expect(this.recordSupplier.getAssignment()).andReturn(ImmutableSet.of(SHARD0_PARTITION)).anyTimes();
        EasyMock.expect((String) this.recordSupplier.getLatestSequenceNumber((StreamPartition) EasyMock.anyObject())).andReturn("10").anyTimes();
    }

    @Test
    public void testRunning() {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
        EasyMock.expect(this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).anyTimes();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        verifyAll();
    }

    @Test
    public void testRunningStreamGetSequenceNumberReturnsNull() {
        EasyMock.reset(new Object[]{this.recordSupplier});
        EasyMock.expect(this.recordSupplier.getAssignment()).andReturn(ImmutableSet.of(SHARD0_PARTITION)).anyTimes();
        EasyMock.expect((String) this.recordSupplier.getLatestSequenceNumber((StreamPartition) EasyMock.anyObject())).andReturn((Object) null).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
        EasyMock.expect(this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).anyTimes();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        List exceptionEvents = ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents();
        Assert.assertEquals(1L, exceptionEvents.size());
        Assert.assertFalse(((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent) exceptionEvents.get(0)).isStreamException());
        Assert.assertEquals(ISE.class.getName(), ((SupervisorStateManager.ExceptionEvent) exceptionEvents.get(0)).getExceptionClass());
        Assert.assertEquals(StringUtils.format("unable to fetch sequence number for partition[%s] from stream", new Object[]{SHARD_ID}), ((SupervisorStateManager.ExceptionEvent) exceptionEvents.get(0)).getMessage());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(2L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        verifyAll();
    }

    @Test
    public void testConnectingToStreamFail() {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andThrow(new StreamException(new IllegalStateException(EXCEPTION_MSG))).anyTimes();
        EasyMock.expect(this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).anyTimes();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        List exceptionEvents = ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents();
        Assert.assertEquals(1L, exceptionEvents.size());
        Assert.assertTrue(((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent) exceptionEvents.get(0)).isStreamException());
        Assert.assertEquals(IllegalStateException.class.getName(), ((SupervisorStateManager.ExceptionEvent) exceptionEvents.get(0)).getExceptionClass());
        Assert.assertEquals(StringUtils.format("%s: %s", new Object[]{IllegalStateException.class.getName(), EXCEPTION_MSG}), ((SupervisorStateManager.ExceptionEvent) exceptionEvents.get(0)).getMessage());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(2L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        verifyAll();
    }

    @Test
    public void testConnectingToStreamFailRecoveryFailRecovery() {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andThrow(new StreamException(new IllegalStateException())).times(3);
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).times(3);
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andThrow(new StreamException(new IllegalStateException())).times(3);
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).times(3);
        EasyMock.expect(this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).anyTimes();
        EasyMock.expect(this.taskRunner.getRunningTasks()).andReturn(ImmutableList.of()).anyTimes();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.LOST_CONTACT_WITH_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.LOST_CONTACT_WITH_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.LOST_CONTACT_WITH_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        verifyAll();
    }

    @Test
    public void testDiscoveringInitialTasksFailRecoveryFail() {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
        EasyMock.expect(this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
        EasyMock.expect(this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).times(6);
        EasyMock.expect(this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).anyTimes();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        List exceptionEvents = ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents();
        Assert.assertEquals(1L, exceptionEvents.size());
        Assert.assertFalse(((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent) exceptionEvents.get(0)).isStreamException());
        Assert.assertEquals(IllegalStateException.class.getName(), ((SupervisorStateManager.ExceptionEvent) exceptionEvents.get(0)).getExceptionClass());
        Assert.assertEquals(EXCEPTION_MSG, ((SupervisorStateManager.ExceptionEvent) exceptionEvents.get(0)).getMessage());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(2L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        verifyAll();
    }

    @Test
    public void testIdleStateTransition() throws Exception {
        EasyMock.reset(new Object[]{this.spec});
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.expect(this.spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
        EasyMock.expect(this.spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig(STREAM, new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, 1, new Period("PT1H"), new Period("PT1S"), new Period("PT30S"), false, new Period("PT30M"), null, null, null, null, new IdleConfig(true, 200L), null) { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.1
        }).anyTimes();
        EasyMock.expect(this.spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
        EasyMock.expect(this.spec.getEmitter()).andReturn(this.emitter).anyTimes();
        EasyMock.expect(this.spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.2
            public Duration getEmissionDuration() {
                return new Period("PT1S").toStandardDuration();
            }
        }).anyTimes();
        EasyMock.expect(this.spec.getType()).andReturn("test").anyTimes();
        EasyMock.expect(this.spec.getSupervisorStateManagerConfig()).andReturn(this.supervisorConfig).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
        EasyMock.expect(this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).anyTimes();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        Thread.sleep(100L);
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        Thread.sleep(100L);
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        Thread.sleep(100L);
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        verifyAll();
    }

    @Test
    public void testCreatingTasksFailRecoveryFail() {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
        EasyMock.expect(this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).times(3);
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
        EasyMock.expect(this.taskRunner.getRunningTasks()).andReturn(ImmutableList.of()).anyTimes();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        List exceptionEvents = ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents();
        Assert.assertEquals(1L, exceptionEvents.size());
        Assert.assertFalse(((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent) exceptionEvents.get(0)).isStreamException());
        Assert.assertEquals(IllegalStateException.class.getName(), ((SupervisorStateManager.ExceptionEvent) exceptionEvents.get(0)).getExceptionClass());
        Assert.assertEquals(EXCEPTION_MSG, ((SupervisorStateManager.ExceptionEvent) exceptionEvents.get(0)).getMessage());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(2L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        verifyAll();
    }

    @Test
    public void testSuspended() {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(true).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
        EasyMock.expect(this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).anyTimes();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.SUSPENDED, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.SUSPENDED, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.SUSPENDED, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.SUSPENDED, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        verifyAll();
    }

    @Test
    public void testStopping() {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
        EasyMock.expect(this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).anyTimes();
        this.taskRunner.unregisterListener("testSupervisorId");
        this.indexTaskClient.close();
        this.recordSupplier.close();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.stop(false);
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        verifyAll();
    }

    @Test
    public void testStoppingGracefully() {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
        EasyMock.expect(this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).anyTimes();
        this.taskRunner.unregisterListener("testSupervisorId");
        this.indexTaskClient.close();
        this.recordSupplier.close();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.stop(true);
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        verifyAll();
    }

    @Test(timeout = 60000)
    public void testCheckpointForActiveTaskGroup() throws InterruptedException, JsonProcessingException {
        DateTime nowUtc = DateTimes.nowUtc();
        SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig = new SeekableStreamSupervisorIOConfig(STREAM, new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, 1, new Period("PT1H"), new Period("PT1S"), new Period("PT30S"), false, new Period("PT30M"), null, null, null, null, new IdleConfig(true, 200L), null) { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.3
        };
        EasyMock.reset(new Object[]{this.spec});
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.expect(this.spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
        EasyMock.expect(this.spec.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
        EasyMock.expect(this.spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
        EasyMock.expect(this.spec.getEmitter()).andReturn(this.emitter).anyTimes();
        EasyMock.expect(this.spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.4
            public Duration getEmissionDuration() {
                return new Period("PT2S").toStandardDuration();
            }
        }).anyTimes();
        EasyMock.expect(this.spec.getType()).andReturn(STREAM).anyTimes();
        EasyMock.expect(this.spec.getSupervisorStateManagerConfig()).andReturn(this.supervisorConfig).anyTimes();
        EasyMock.expect(this.spec.getContextValue("tags")).andReturn("").anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).anyTimes();
        SeekableStreamIndexTaskIOConfig createTaskIoConfigExt = createTaskIoConfigExt(0, Collections.singletonMap(SHARD_ID, "10"), Collections.singletonMap(SHARD_ID, "20"), "test", nowUtc, null, Collections.emptySet(), seekableStreamSupervisorIOConfig);
        SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig = getTuningConfig().convertToTaskTuningConfig();
        TreeMap treeMap = new TreeMap();
        treeMap.put(0, ImmutableMap.of(SHARD_ID, 10L, "1", 20L));
        HashMap hashMap = new HashMap();
        hashMap.put("checkpoints", new ObjectMapper().writeValueAsString(treeMap));
        TestSeekableStreamIndexTask testSeekableStreamIndexTask = new TestSeekableStreamIndexTask("id1", null, getDataSchema(), convertToTaskTuningConfig, createTaskIoConfigExt, hashMap, SHARD_ID);
        TestSeekableStreamIndexTask testSeekableStreamIndexTask2 = new TestSeekableStreamIndexTask("id2", null, getDataSchema(), convertToTaskTuningConfig, createTaskIoConfigExt, hashMap, SHARD_ID);
        TaskLocation create = TaskLocation.create("testHost", 1234, -1);
        TaskLocation create2 = TaskLocation.create("testHost2", 145, -1);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new TestTaskRunnerWorkItem(testSeekableStreamIndexTask, null, create));
        arrayList.add(new TestTaskRunnerWorkItem(testSeekableStreamIndexTask2, null, create2));
        EasyMock.expect(this.taskRunner.getRunningTasks()).andReturn(arrayList).anyTimes();
        EasyMock.expect(this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(testSeekableStreamIndexTask, testSeekableStreamIndexTask2)).anyTimes();
        EasyMock.expect(this.taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
        EasyMock.expect(this.taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
        EasyMock.expect(this.taskStorage.getTask("id1")).andReturn(Optional.of(testSeekableStreamIndexTask)).anyTimes();
        EasyMock.expect(this.taskStorage.getTask("id2")).andReturn(Optional.of(testSeekableStreamIndexTask2)).anyTimes();
        EasyMock.reset(new Object[]{this.indexerMetadataStorageCoordinator});
        EasyMock.expect(this.indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new TestSeekableStreamDataSourceMetadata(null)).anyTimes();
        EasyMock.expect(this.indexTaskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)).anyTimes();
        EasyMock.expect(this.indexTaskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)).anyTimes();
        EasyMock.expect(this.indexTaskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(nowUtc)).anyTimes();
        EasyMock.expect(this.indexTaskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(nowUtc)).anyTimes();
        ImmutableMap of = ImmutableMap.of(SHARD_ID, "10");
        TreeMap treeMap2 = new TreeMap();
        treeMap2.put(0, of);
        EasyMock.expect(this.indexTaskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean())).andReturn(Futures.immediateFuture(treeMap2)).anyTimes();
        EasyMock.expect(this.indexTaskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean())).andReturn(Futures.immediateFuture(treeMap2)).anyTimes();
        EasyMock.expect(this.indexTaskClient.pauseAsync("id1")).andReturn(Futures.immediateFuture(of)).anyTimes();
        EasyMock.expect(this.indexTaskClient.pauseAsync("id2")).andReturn(Futures.immediateFuture(of)).anyTimes();
        EasyMock.expect(this.indexTaskClient.setEndOffsetsAsync("id1", of, false)).andReturn(Futures.immediateFuture(true)).anyTimes();
        EasyMock.expect(this.indexTaskClient.setEndOffsetsAsync("id2", of, false)).andReturn(Futures.immediateFuture(true)).anyTimes();
        EasyMock.expect(this.indexTaskClient.resumeAsync("id1")).andReturn(Futures.immediateFuture(true)).anyTimes();
        EasyMock.expect(this.indexTaskClient.resumeAsync("id2")).andReturn(Futures.immediateFuture(true)).anyTimes();
        EasyMock.expect(this.indexTaskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true)).anyTimes();
        EasyMock.expect(this.indexTaskClient.stopAsync("id2", false)).andReturn(Futures.immediateFuture(true)).anyTimes();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        testSeekableStreamSupervisor.runInternal();
        testSeekableStreamSupervisor.checkpoint(0, new TestSeekableStreamDataSourceMetadata(new SeekableStreamStartSequenceNumbers(STREAM, (Map) treeMap2.get(0), ImmutableSet.of())));
        while (testSeekableStreamSupervisor.getNoticesQueueSize() > 0) {
            Thread.sleep(100L);
        }
        verifyAll();
        Assert.assertTrue(testSeekableStreamSupervisor.getNoticesQueueSize() == 0);
    }

    @Test
    public void testEmitBothLag() throws Exception {
        expectEmitterSupervisor(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TestEmittingTestSeekableStreamSupervisor testEmittingTestSeekableStreamSupervisor = new TestEmittingTestSeekableStreamSupervisor(countDownLatch, (byte) 1, ImmutableMap.of("1", 100L, "2", 250L, "3", 500L), ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L));
        testEmittingTestSeekableStreamSupervisor.start();
        Assert.assertTrue(testEmittingTestSeekableStreamSupervisor.stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, testEmittingTestSeekableStreamSupervisor.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, testEmittingTestSeekableStreamSupervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(testEmittingTestSeekableStreamSupervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse(testEmittingTestSeekableStreamSupervisor.stateManager.isAtLeastOneSuccessfulRun());
        countDownLatch.await();
        List<Event> filterMetrics = filterMetrics(this.emitter.getEvents(), Arrays.asList("ingest/test/lag", "ingest/test/maxLag", "ingest/test/avgLag", "ingest/test/lag/time", "ingest/test/maxLag/time", "ingest/test/avgLag/time"));
        Assert.assertEquals(6L, filterMetrics.size());
        Assert.assertEquals("ingest/test/lag", filterMetrics.get(0).toMap().get("metric"));
        Assert.assertEquals(850L, filterMetrics.get(0).toMap().get("value"));
        Assert.assertEquals(METRIC_TAGS, filterMetrics.get(0).toMap().get("tags"));
        Assert.assertEquals("ingest/test/maxLag", filterMetrics.get(1).toMap().get("metric"));
        Assert.assertEquals(500L, filterMetrics.get(1).toMap().get("value"));
        Assert.assertEquals(METRIC_TAGS, filterMetrics.get(1).toMap().get("tags"));
        Assert.assertEquals("ingest/test/avgLag", filterMetrics.get(2).toMap().get("metric"));
        Assert.assertEquals(283L, filterMetrics.get(2).toMap().get("value"));
        Assert.assertEquals(METRIC_TAGS, filterMetrics.get(2).toMap().get("tags"));
        Assert.assertEquals("ingest/test/lag/time", filterMetrics.get(3).toMap().get("metric"));
        Assert.assertEquals(45000L, filterMetrics.get(3).toMap().get("value"));
        Assert.assertEquals(METRIC_TAGS, filterMetrics.get(3).toMap().get("tags"));
        Assert.assertEquals("ingest/test/maxLag/time", filterMetrics.get(4).toMap().get("metric"));
        Assert.assertEquals(20000L, filterMetrics.get(4).toMap().get("value"));
        Assert.assertEquals(METRIC_TAGS, filterMetrics.get(4).toMap().get("tags"));
        Assert.assertEquals("ingest/test/avgLag/time", filterMetrics.get(5).toMap().get("metric"));
        Assert.assertEquals(15000L, filterMetrics.get(5).toMap().get("value"));
        Assert.assertEquals(METRIC_TAGS, filterMetrics.get(5).toMap().get("tags"));
        verifyAll();
    }

    @Test
    public void testEmitRecordLag() throws Exception {
        expectEmitterSupervisor(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TestEmittingTestSeekableStreamSupervisor testEmittingTestSeekableStreamSupervisor = new TestEmittingTestSeekableStreamSupervisor(countDownLatch, (byte) 1, ImmutableMap.of("1", 100L, "2", 250L, "3", 500L), null);
        testEmittingTestSeekableStreamSupervisor.start();
        Assert.assertTrue(testEmittingTestSeekableStreamSupervisor.stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, testEmittingTestSeekableStreamSupervisor.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, testEmittingTestSeekableStreamSupervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(testEmittingTestSeekableStreamSupervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse(testEmittingTestSeekableStreamSupervisor.stateManager.isAtLeastOneSuccessfulRun());
        countDownLatch.await();
        List<Event> filterMetrics = filterMetrics(this.emitter.getEvents(), Arrays.asList("ingest/test/lag", "ingest/test/maxLag", "ingest/test/avgLag"));
        Assert.assertEquals(3L, filterMetrics.size());
        Assert.assertEquals("ingest/test/lag", filterMetrics.get(0).toMap().get("metric"));
        Assert.assertEquals(850L, filterMetrics.get(0).toMap().get("value"));
        Assert.assertEquals("ingest/test/maxLag", filterMetrics.get(1).toMap().get("metric"));
        Assert.assertEquals(500L, filterMetrics.get(1).toMap().get("value"));
        Assert.assertEquals("ingest/test/avgLag", filterMetrics.get(2).toMap().get("metric"));
        Assert.assertEquals(283L, filterMetrics.get(2).toMap().get("value"));
        verifyAll();
    }

    @Test
    public void testEmitTimeLag() throws Exception {
        expectEmitterSupervisor(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TestEmittingTestSeekableStreamSupervisor testEmittingTestSeekableStreamSupervisor = new TestEmittingTestSeekableStreamSupervisor(countDownLatch, (byte) 1, null, ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L));
        testEmittingTestSeekableStreamSupervisor.start();
        Assert.assertTrue(testEmittingTestSeekableStreamSupervisor.stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, testEmittingTestSeekableStreamSupervisor.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, testEmittingTestSeekableStreamSupervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(testEmittingTestSeekableStreamSupervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse(testEmittingTestSeekableStreamSupervisor.stateManager.isAtLeastOneSuccessfulRun());
        countDownLatch.await();
        List<Event> filterMetrics = filterMetrics(this.emitter.getEvents(), Arrays.asList("ingest/test/lag/time", "ingest/test/maxLag/time", "ingest/test/avgLag/time"));
        Assert.assertEquals(3L, filterMetrics.size());
        Assert.assertEquals("ingest/test/lag/time", filterMetrics.get(0).toMap().get("metric"));
        Assert.assertEquals(45000L, filterMetrics.get(0).toMap().get("value"));
        Assert.assertEquals(METRIC_TAGS, filterMetrics.get(0).toMap().get("tags"));
        Assert.assertEquals("ingest/test/maxLag/time", filterMetrics.get(1).toMap().get("metric"));
        Assert.assertEquals(20000L, filterMetrics.get(1).toMap().get("value"));
        Assert.assertEquals(METRIC_TAGS, filterMetrics.get(1).toMap().get("tags"));
        Assert.assertEquals("ingest/test/avgLag/time", filterMetrics.get(2).toMap().get("metric"));
        Assert.assertEquals(15000L, filterMetrics.get(2).toMap().get("value"));
        Assert.assertEquals(METRIC_TAGS, filterMetrics.get(2).toMap().get("tags"));
        verifyAll();
    }

    @Test
    public void testEmitNoticesQueueSize() throws Exception {
        expectEmitterSupervisor(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TestEmittingTestSeekableStreamSupervisor testEmittingTestSeekableStreamSupervisor = new TestEmittingTestSeekableStreamSupervisor(countDownLatch, (byte) 2, null, null);
        testEmittingTestSeekableStreamSupervisor.start();
        Assert.assertTrue(testEmittingTestSeekableStreamSupervisor.stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, testEmittingTestSeekableStreamSupervisor.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, testEmittingTestSeekableStreamSupervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(testEmittingTestSeekableStreamSupervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse(testEmittingTestSeekableStreamSupervisor.stateManager.isAtLeastOneSuccessfulRun());
        countDownLatch.await();
        List<Event> filterMetrics = filterMetrics(this.emitter.getEvents(), Collections.singletonList("ingest/notices/queueSize"));
        Assert.assertEquals(1L, filterMetrics.size());
        Assert.assertEquals("ingest/notices/queueSize", filterMetrics.get(0).toMap().get("metric"));
        Assert.assertEquals(0, filterMetrics.get(0).toMap().get("value"));
        Assert.assertEquals(METRIC_TAGS, filterMetrics.get(0).toMap().get("tags"));
        Assert.assertEquals(DATASOURCE, filterMetrics.get(0).toMap().get("dataSource"));
        verifyAll();
    }

    @Test
    public void testEmitNoticesTime() throws Exception {
        expectEmitterSupervisor(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TestEmittingTestSeekableStreamSupervisor testEmittingTestSeekableStreamSupervisor = new TestEmittingTestSeekableStreamSupervisor(countDownLatch, (byte) 4, null, null);
        testEmittingTestSeekableStreamSupervisor.start();
        Assert.assertTrue(testEmittingTestSeekableStreamSupervisor.stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, testEmittingTestSeekableStreamSupervisor.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, testEmittingTestSeekableStreamSupervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(testEmittingTestSeekableStreamSupervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse(testEmittingTestSeekableStreamSupervisor.stateManager.isAtLeastOneSuccessfulRun());
        countDownLatch.await();
        List<Event> filterMetrics = filterMetrics(this.emitter.getEvents(), Collections.singletonList("ingest/notices/time"));
        Assert.assertEquals(1L, filterMetrics.size());
        Assert.assertEquals("ingest/notices/time", filterMetrics.get(0).toMap().get("metric"));
        Assert.assertEquals(METRIC_TAGS, filterMetrics.get(0).toMap().get("tags"));
        Assert.assertTrue(String.valueOf(filterMetrics.get(0).toMap().get("value")), ((Long) filterMetrics.get(0).toMap().get("value")).longValue() > 0);
        Assert.assertEquals(DATASOURCE, filterMetrics.get(0).toMap().get("dataSource"));
        Assert.assertEquals("run_notice", filterMetrics.get(0).toMap().get("noticeType"));
        verifyAll();
    }

    @Test
    public void testEmitNoLagWhenSuspended() throws Exception {
        expectEmitterSupervisor(true);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TestEmittingTestSeekableStreamSupervisor testEmittingTestSeekableStreamSupervisor = new TestEmittingTestSeekableStreamSupervisor(countDownLatch, (byte) 1, ImmutableMap.of("1", 100L, "2", 250L, "3", 500L), ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L));
        testEmittingTestSeekableStreamSupervisor.start();
        testEmittingTestSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(testEmittingTestSeekableStreamSupervisor.stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.SUSPENDED, testEmittingTestSeekableStreamSupervisor.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.SUSPENDED, testEmittingTestSeekableStreamSupervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(testEmittingTestSeekableStreamSupervisor.stateManager.getExceptionEvents().isEmpty());
        countDownLatch.await();
        Assert.assertEquals(0L, filterMetrics(this.emitter.getEvents(), Arrays.asList("ingest/test/lag", "ingest/test/maxLag", "ingest/test/avgLag", "ingest/test/lag/time", "ingest/test/maxLag/time", "ingest/test/avgLag/time")).size());
        verifyAll();
    }

    @Test
    public void testGetStats() {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false);
        EasyMock.expect(this.indexTaskClient.getMovingAveragesAsync("task1")).andReturn(Futures.immediateFuture(ImmutableMap.of("prop1", "val1"))).times(1);
        EasyMock.expect(this.indexTaskClient.getMovingAveragesAsync("task2")).andReturn(Futures.immediateFuture(ImmutableMap.of("prop2", "val2"))).times(1);
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition(SHARD_ID), ImmutableMap.of(SHARD_ID, SHARD_ID), Optional.absent(), Optional.absent(), ImmutableSet.of("task1"), ImmutableSet.of());
        testSeekableStreamSupervisor.addTaskGroupToPendingCompletionTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition("1"), ImmutableMap.of("1", SHARD_ID), Optional.absent(), Optional.absent(), ImmutableSet.of("task2"), ImmutableSet.of());
        Map stats = testSeekableStreamSupervisor.getStats();
        verifyAll();
        Assert.assertEquals(1L, stats.size());
        Assert.assertEquals(ImmutableSet.of(SHARD_ID), stats.keySet());
        Assert.assertEquals(ImmutableMap.of("task1", ImmutableMap.of("prop1", "val1"), "task2", ImmutableMap.of("prop2", "val2")), stats.get(SHARD_ID));
    }

    @Test
    public void testSupervisorResetAllWithCheckpoints() throws InterruptedException {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false);
        EasyMock.expect(Boolean.valueOf(this.indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE))).andReturn(true);
        this.taskQueue.shutdown("task1", "DataSourceMetadata is not found while reset", new Object[0]);
        EasyMock.expectLastCall();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition(SHARD_ID), ImmutableMap.of(SHARD_ID, "5"), Optional.absent(), Optional.absent(), ImmutableSet.of("task1"), ImmutableSet.of());
        testSeekableStreamSupervisor.addTaskGroupToPendingCompletionTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition("1"), ImmutableMap.of("1", "6"), Optional.absent(), Optional.absent(), ImmutableSet.of("task2"), ImmutableSet.of());
        Assert.assertEquals(1L, testSeekableStreamSupervisor.getActiveTaskGroupsCount());
        Assert.assertEquals(0L, testSeekableStreamSupervisor.getNoticesQueueSize());
        Assert.assertEquals(0L, testSeekableStreamSupervisor.getPartitionOffsets().size());
        testSeekableStreamSupervisor.reset(null);
        validateSupervisorStateAfterResetOffsets(testSeekableStreamSupervisor, ImmutableMap.of(), 0);
    }

    @Test
    public void testSupervisorResetOneTaskSpecificOffsetsWithCheckpoints() throws InterruptedException, IOException {
        ImmutableMap of = ImmutableMap.of(SHARD_ID, SHARD_ID, "1", "10", "2", "20", "3", "30");
        ImmutableMap<String, String> of2 = ImmutableMap.of(SHARD_ID, "1000", "2", "2500");
        ImmutableMap of3 = ImmutableMap.of(SHARD_ID, "1000", "1", "10", "2", "2500", "3", "30");
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false);
        EasyMock.reset(new Object[]{this.indexerMetadataStorageCoordinator});
        EasyMock.expect(this.indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new TestSeekableStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers(STREAM, of)));
        EasyMock.expect(Boolean.valueOf(this.indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers(STREAM, of3))))).andReturn(true);
        this.taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset offsets is called", new Object[0]);
        EasyMock.expectLastCall();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition(SHARD_ID), of, Optional.absent(), Optional.absent(), ImmutableSet.of("task1"), ImmutableSet.of());
        TestSeekableStreamDataSourceMetadata testSeekableStreamDataSourceMetadata = new TestSeekableStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers(STREAM, of2));
        Assert.assertEquals(1L, testSeekableStreamSupervisor.getActiveTaskGroupsCount());
        Assert.assertEquals(0L, testSeekableStreamSupervisor.getNoticesQueueSize());
        Assert.assertEquals(0L, testSeekableStreamSupervisor.getPartitionOffsets().size());
        testSeekableStreamSupervisor.resetOffsets(testSeekableStreamDataSourceMetadata);
        validateSupervisorStateAfterResetOffsets(testSeekableStreamSupervisor, of2, 0);
    }

    @Test
    public void testGetActiveRealtimeSequencePrefixes() {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false);
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.getIoConfig().setTaskCount(3);
        testSeekableStreamSupervisor.start();
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition(SHARD_ID), ImmutableMap.of(SHARD_ID, "5"), Optional.absent(), Optional.absent(), ImmutableSet.of("task1"), ImmutableSet.of());
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition("1"), ImmutableMap.of("1", "6"), Optional.absent(), Optional.absent(), ImmutableSet.of("task2"), ImmutableSet.of());
        testSeekableStreamSupervisor.addTaskGroupToPendingCompletionTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition("2"), ImmutableMap.of("2", "100"), Optional.absent(), Optional.absent(), ImmutableSet.of("task3"), ImmutableSet.of());
        Assert.assertEquals(3L, testSeekableStreamSupervisor.getActiveRealtimeSequencePrefixes().size());
    }

    @Test
    public void testSupervisorResetSpecificOffsetsTasksWithCheckpoints() throws InterruptedException, IOException {
        ImmutableMap of = ImmutableMap.of(SHARD_ID, "5", "1", "6", "2", "100");
        ImmutableMap<String, String> of2 = ImmutableMap.of(SHARD_ID, "10", "1", "8");
        ImmutableMap of3 = ImmutableMap.of(SHARD_ID, "10", "1", "8", "2", "100");
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false);
        EasyMock.reset(new Object[]{this.indexerMetadataStorageCoordinator});
        EasyMock.expect(this.indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new TestSeekableStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers(STREAM, of)));
        EasyMock.expect(Boolean.valueOf(this.indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers(STREAM, of3))))).andReturn(true);
        this.taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset offsets is called", new Object[0]);
        EasyMock.expectLastCall();
        this.taskQueue.shutdown("task2", "DataSourceMetadata is updated while reset offsets is called", new Object[0]);
        EasyMock.expectLastCall();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.getIoConfig().setTaskCount(3);
        testSeekableStreamSupervisor.start();
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition(SHARD_ID), ImmutableMap.of(SHARD_ID, "5"), Optional.absent(), Optional.absent(), ImmutableSet.of("task1"), ImmutableSet.of());
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition("1"), ImmutableMap.of("1", "6"), Optional.absent(), Optional.absent(), ImmutableSet.of("task2"), ImmutableSet.of());
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition("2"), ImmutableMap.of("2", "100"), Optional.absent(), Optional.absent(), ImmutableSet.of("task3"), ImmutableSet.of());
        TestSeekableStreamDataSourceMetadata testSeekableStreamDataSourceMetadata = new TestSeekableStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers(STREAM, of2));
        Assert.assertEquals(3L, testSeekableStreamSupervisor.getActiveTaskGroupsCount());
        Assert.assertEquals(0L, testSeekableStreamSupervisor.getNoticesQueueSize());
        Assert.assertEquals(0L, testSeekableStreamSupervisor.getPartitionOffsets().size());
        testSeekableStreamSupervisor.resetOffsets(testSeekableStreamDataSourceMetadata);
        validateSupervisorStateAfterResetOffsets(testSeekableStreamSupervisor, of2, 1);
    }

    @Test
    public void testSupervisorResetOffsetsWithNoCheckpoints() throws InterruptedException {
        ImmutableMap<String, String> of = ImmutableMap.of(SHARD_ID, "10", "1", "8");
        ImmutableMap copyOf = ImmutableMap.copyOf(of);
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false);
        EasyMock.reset(new Object[]{this.indexerMetadataStorageCoordinator});
        EasyMock.expect(this.indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn((Object) null);
        EasyMock.expect(Boolean.valueOf(this.indexerMetadataStorageCoordinator.insertDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers(STREAM, copyOf))))).andReturn(true);
        this.taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset offsets is called", new Object[0]);
        EasyMock.expectLastCall();
        this.taskQueue.shutdown("task2", "DataSourceMetadata is updated while reset offsets is called", new Object[0]);
        EasyMock.expectLastCall();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.getIoConfig().setTaskCount(3);
        testSeekableStreamSupervisor.start();
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition(SHARD_ID), ImmutableMap.of(SHARD_ID, "5"), Optional.absent(), Optional.absent(), ImmutableSet.of("task1"), ImmutableSet.of());
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition("1"), ImmutableMap.of("1", "6"), Optional.absent(), Optional.absent(), ImmutableSet.of("task2"), ImmutableSet.of());
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition("2"), ImmutableMap.of("2", "100"), Optional.absent(), Optional.absent(), ImmutableSet.of("task3"), ImmutableSet.of());
        TestSeekableStreamDataSourceMetadata testSeekableStreamDataSourceMetadata = new TestSeekableStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers(STREAM, of));
        Assert.assertEquals(3L, testSeekableStreamSupervisor.getActiveTaskGroupsCount());
        Assert.assertEquals(0L, testSeekableStreamSupervisor.getNoticesQueueSize());
        Assert.assertEquals(0L, testSeekableStreamSupervisor.getPartitionOffsets().size());
        testSeekableStreamSupervisor.resetOffsets(testSeekableStreamDataSourceMetadata);
        validateSupervisorStateAfterResetOffsets(testSeekableStreamSupervisor, of, 1);
    }

    @Test
    public void testSupervisorResetWithNoPartitions() throws IOException, InterruptedException {
        ImmutableMap of = ImmutableMap.of(SHARD_ID, "5", "1", "6");
        ImmutableMap<String, String> of2 = ImmutableMap.of();
        ImmutableMap of3 = ImmutableMap.of(SHARD_ID, "5", "1", "6");
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false);
        EasyMock.reset(new Object[]{this.indexerMetadataStorageCoordinator});
        EasyMock.expect(this.indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new TestSeekableStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers(STREAM, of)));
        EasyMock.expect(Boolean.valueOf(this.indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers(STREAM, of3))))).andReturn(true);
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.getIoConfig().setTaskCount(2);
        testSeekableStreamSupervisor.start();
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition(SHARD_ID), ImmutableMap.of(SHARD_ID, "5"), Optional.absent(), Optional.absent(), ImmutableSet.of("task1"), ImmutableSet.of());
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition("1"), ImmutableMap.of("1", "6"), Optional.absent(), Optional.absent(), ImmutableSet.of("task2"), ImmutableSet.of());
        TestSeekableStreamDataSourceMetadata testSeekableStreamDataSourceMetadata = new TestSeekableStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers(STREAM, of2));
        Assert.assertEquals(2L, testSeekableStreamSupervisor.getActiveTaskGroupsCount());
        Assert.assertEquals(0L, testSeekableStreamSupervisor.getNoticesQueueSize());
        Assert.assertEquals(0L, testSeekableStreamSupervisor.getPartitionOffsets().size());
        testSeekableStreamSupervisor.resetOffsets(testSeekableStreamDataSourceMetadata);
        validateSupervisorStateAfterResetOffsets(testSeekableStreamSupervisor, of2, 2);
    }

    @Test
    public void testSupervisorResetWithNewPartition() throws IOException, InterruptedException {
        ImmutableMap of = ImmutableMap.of(SHARD_ID, "5", "1", "6");
        ImmutableMap<String, String> of2 = ImmutableMap.of("2", "20");
        ImmutableMap of3 = ImmutableMap.of(SHARD_ID, "5", "1", "6", "2", "20");
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false);
        EasyMock.reset(new Object[]{this.indexerMetadataStorageCoordinator});
        EasyMock.expect(this.indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new TestSeekableStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers(STREAM, of)));
        EasyMock.expect(Boolean.valueOf(this.indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers(STREAM, of3))))).andReturn(true);
        this.taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset offsets is called", new Object[0]);
        EasyMock.expectLastCall();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.getIoConfig().setTaskCount(2);
        testSeekableStreamSupervisor.start();
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition(SHARD_ID), ImmutableMap.of(SHARD_ID, "5"), Optional.absent(), Optional.absent(), ImmutableSet.of("task1"), ImmutableSet.of());
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition("1"), ImmutableMap.of("1", "6"), Optional.absent(), Optional.absent(), ImmutableSet.of("task2"), ImmutableSet.of());
        TestSeekableStreamDataSourceMetadata testSeekableStreamDataSourceMetadata = new TestSeekableStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers(STREAM, of2));
        Assert.assertEquals(2L, testSeekableStreamSupervisor.getActiveTaskGroupsCount());
        Assert.assertEquals(0L, testSeekableStreamSupervisor.getNoticesQueueSize());
        Assert.assertEquals(0L, testSeekableStreamSupervisor.getPartitionOffsets().size());
        testSeekableStreamSupervisor.resetOffsets(testSeekableStreamDataSourceMetadata);
        validateSupervisorStateAfterResetOffsets(testSeekableStreamSupervisor, of2, 1);
    }

    @Test
    public void testSupervisorNoResetDataSourceMetadata() {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false);
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition(SHARD_ID), ImmutableMap.of(SHARD_ID, SHARD_ID), Optional.absent(), Optional.absent(), ImmutableSet.of("task1"), ImmutableSet.of());
        testSeekableStreamSupervisor.addTaskGroupToPendingCompletionTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition("1"), ImmutableMap.of("1", SHARD_ID), Optional.absent(), Optional.absent(), ImmutableSet.of("task2"), ImmutableSet.of());
        verifyAll();
        MatcherAssert.assertThat(Assert.assertThrows(DruidException.class, () -> {
            testSeekableStreamSupervisor.resetOffsets(null);
        }), DruidExceptionMatcher.invalidInput().expectMessageIs("Reset dataSourceMetadata is required for resetOffsets."));
    }

    @Test
    public void testSupervisorResetWithInvalidStartSequenceMetadata() {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false);
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition(SHARD_ID), ImmutableMap.of(SHARD_ID, SHARD_ID), Optional.absent(), Optional.absent(), ImmutableSet.of("task1"), ImmutableSet.of());
        testSeekableStreamSupervisor.addTaskGroupToPendingCompletionTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition("1"), ImmutableMap.of("1", SHARD_ID), Optional.absent(), Optional.absent(), ImmutableSet.of("task2"), ImmutableSet.of());
        verifyAll();
        TestSeekableStreamDataSourceMetadata testSeekableStreamDataSourceMetadata = new TestSeekableStreamDataSourceMetadata(new SeekableStreamStartSequenceNumbers("i-am-not-real", ImmutableMap.of(SHARD_ID, "10", "1", "20", "2", "30"), ImmutableSet.of()));
        MatcherAssert.assertThat(Assert.assertThrows(DruidException.class, () -> {
            testSeekableStreamSupervisor.resetOffsets(testSeekableStreamDataSourceMetadata);
        }), DruidExceptionMatcher.invalidInput().expectMessageIs(StringUtils.format("Provided datasourceMetadata[%s] is invalid. Sequence numbers can only be of type[SeekableStreamEndSequenceNumbers], but found[SeekableStreamStartSequenceNumbers].", new Object[]{testSeekableStreamDataSourceMetadata})));
    }

    @Test
    public void testSupervisorResetInvalidStream() {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false);
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        testSeekableStreamSupervisor.addTaskGroupToActivelyReadingTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition(SHARD_ID), ImmutableMap.of(SHARD_ID, SHARD_ID), Optional.absent(), Optional.absent(), ImmutableSet.of("task1"), ImmutableSet.of());
        testSeekableStreamSupervisor.addTaskGroupToPendingCompletionTaskGroup(testSeekableStreamSupervisor.getTaskGroupIdForPartition("1"), ImmutableMap.of("1", SHARD_ID), Optional.absent(), Optional.absent(), ImmutableSet.of("task2"), ImmutableSet.of());
        verifyAll();
        TestSeekableStreamDataSourceMetadata testSeekableStreamDataSourceMetadata = new TestSeekableStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers("i-am-not-real", ImmutableMap.of(SHARD_ID, "10", "1", "20", "2", "30")));
        MatcherAssert.assertThat(Assert.assertThrows(DruidException.class, () -> {
            testSeekableStreamSupervisor.resetOffsets(testSeekableStreamDataSourceMetadata);
        }), DruidExceptionMatcher.invalidInput().expectMessageIs("Stream[i-am-not-real] doesn't exist in the supervisor[testSupervisorId]. Supervisor is consuming stream[stream]."));
    }

    @Test
    public void testStaleOffsetsNegativeLagNotEmitted() throws Exception {
        expectEmitterSupervisor(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TestEmittingTestSeekableStreamSupervisor testEmittingTestSeekableStreamSupervisor = new TestEmittingTestSeekableStreamSupervisor(countDownLatch, (byte) 1, ImmutableMap.of(SHARD_ID, 10L, "1", -100L), null);
        testEmittingTestSeekableStreamSupervisor.start();
        testEmittingTestSeekableStreamSupervisor.sequenceLastUpdated = DateTimes.nowUtc().minus(2147483647L);
        countDownLatch.await();
        testEmittingTestSeekableStreamSupervisor.emitLag();
        Assert.assertEquals(0L, this.emitter.getEvents().size());
    }

    private void validateSupervisorStateAfterResetOffsets(TestSeekableStreamSupervisor testSeekableStreamSupervisor, ImmutableMap<String, String> immutableMap, int i) throws InterruptedException {
        while (testSeekableStreamSupervisor.getNoticesQueueSize() > 0) {
            Thread.sleep(100L);
        }
        Thread.sleep(1000L);
        Assert.assertEquals(i, testSeekableStreamSupervisor.getActiveTaskGroupsCount());
        Assert.assertEquals(immutableMap.size(), testSeekableStreamSupervisor.getPartitionOffsets().size());
        UnmodifiableIterator it = immutableMap.entrySet().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(testSeekableStreamSupervisor.m139getNotSetMarker(), testSeekableStreamSupervisor.getPartitionOffsets().get(((Map.Entry) it.next()).getKey()));
        }
        verifyAll();
    }

    @Test
    public void testScheduleReporting() {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        DruidMonitorSchedulerConfig druidMonitorSchedulerConfig = new DruidMonitorSchedulerConfig();
        EasyMock.expect(this.spec.getMonitorSchedulerConfig()).andReturn(druidMonitorSchedulerConfig).times(2);
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) EasyMock.createMock(ScheduledExecutorService.class);
        EasyMock.expect(scheduledExecutorService.scheduleWithFixedDelay((Runnable) EasyMock.anyObject(), EasyMock.eq(86415000L), EasyMock.eq(300000L), (TimeUnit) EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn((ScheduledFuture) EasyMock.createMock(ScheduledFuture.class)).once();
        EasyMock.expect(scheduledExecutorService.scheduleAtFixedRate((Runnable) EasyMock.anyObject(), EasyMock.eq(86425000L), EasyMock.eq(druidMonitorSchedulerConfig.getEmissionDuration().getMillis()), (TimeUnit) EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn((ScheduledFuture) EasyMock.createMock(ScheduledFuture.class)).times(2);
        EasyMock.replay(new Object[]{scheduledExecutorService, this.spec});
        new BaseTestSeekableStreamSupervisor() { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.5
            public LagStats computeLagStats() {
                return new LagStats(0L, 0L, 0L);
            }
        }.scheduleReporting(scheduledExecutorService);
        EasyMock.verify(new Object[]{scheduledExecutorService, this.spec});
    }

    private List<Event> filterMetrics(List<Event> list, List<String> list2) {
        return (List) list.stream().filter(event -> {
            return list2.contains(event.toMap().get("metric").toString());
        }).collect(Collectors.toList());
    }

    private void expectEmitterSupervisor(boolean z) throws EntryExistsException {
        this.spec = (SeekableStreamSupervisorSpec) createMock(SeekableStreamSupervisorSpec.class);
        EasyMock.expect(this.spec.getSupervisorStateManagerConfig()).andReturn(this.supervisorConfig).anyTimes();
        EasyMock.expect(this.spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
        EasyMock.expect(this.spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig(STREAM, new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, 1, new Period("PT1H"), new Period("PT1S"), new Period("PT30S"), false, new Period("PT30M"), null, null, null, null, null, null) { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.6
        }).anyTimes();
        EasyMock.expect(this.spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
        EasyMock.expect(this.spec.getEmitter()).andReturn(this.emitter).anyTimes();
        EasyMock.expect(this.spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.7
            public Duration getEmissionDuration() {
                return new Period("PT1S").toStandardDuration();
            }
        }).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(Boolean.valueOf(z)).anyTimes();
        EasyMock.expect(this.spec.getType()).andReturn("test").anyTimes();
        EasyMock.expect(this.spec.getContextValue("tags")).andReturn(METRIC_TAGS).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
        EasyMock.expect(this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).anyTimes();
        replayAll();
    }

    private static DataSchema getDataSchema() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(StringDimensionSchema.create("dim1"));
        arrayList.add(StringDimensionSchema.create("dim2"));
        return new DataSchema(DATASOURCE, new TimestampSpec("timestamp", "iso", (DateTime) null), new DimensionsSpec(arrayList), new AggregatorFactory[]{new CountAggregatorFactory("rows")}, new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, ImmutableList.of()), (TransformSpec) null);
    }

    private static SeekableStreamSupervisorIOConfig getIOConfig() {
        return new SeekableStreamSupervisorIOConfig(STREAM, new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, 1, new Period("PT1H"), new Period("P1D"), new Period("PT30S"), false, new Period("PT30M"), null, null, (AutoScalerConfig) OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class), null, null, null) { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.8
        };
    }

    private static Map<String, Object> getProperties() {
        HashMap hashMap = new HashMap();
        hashMap.put("enableTaskAutoScaler", true);
        hashMap.put("lagCollectionIntervalMillis", 500);
        hashMap.put("lagCollectionRangeMillis", 500);
        hashMap.put("scaleOutThreshold", 5000000);
        hashMap.put("triggerScaleOutFractionThreshold", Double.valueOf(0.3d));
        hashMap.put("scaleInThreshold", 1000000);
        hashMap.put("triggerScaleInFractionThreshold", Double.valueOf(0.8d));
        hashMap.put("scaleActionStartDelayMillis", 0);
        hashMap.put("scaleActionPeriodMillis", 100);
        hashMap.put("taskCountMax", 8);
        hashMap.put("taskCountMin", 1);
        hashMap.put("scaleInStep", 1);
        hashMap.put("scaleOutStep", 2);
        hashMap.put("minTriggerScaleActionFrequencyMillis", 1200000);
        return hashMap;
    }

    private static SeekableStreamSupervisorTuningConfig getTuningConfig() {
        return new SeekableStreamSupervisorTuningConfig() { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.9
            public Integer getWorkerThreads() {
                return 1;
            }

            public Long getChatRetries() {
                return 1L;
            }

            public Duration getHttpTimeout() {
                return new Period("PT1M").toStandardDuration();
            }

            public Duration getShutdownTimeout() {
                return new Period("PT1S").toStandardDuration();
            }

            public Duration getRepartitionTransitionDuration() {
                return new Period("PT2M").toStandardDuration();
            }

            public Duration getOffsetFetchPeriod() {
                return new Period("PT5M").toStandardDuration();
            }

            public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() {
                return new SeekableStreamIndexTaskTuningConfig(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.9.1
                    /* renamed from: withBasePersistDirectory, reason: merged with bridge method [inline-methods] */
                    public SeekableStreamIndexTaskTuningConfig m137withBasePersistDirectory(File file) {
                        return null;
                    }

                    public String toString() {
                        return null;
                    }
                };
            }
        };
    }

    private static SeekableStreamIndexTaskIOConfig createTaskIoConfigExt(int i, Map<String, String> map, Map<String, String> map2, String str, DateTime dateTime, DateTime dateTime2, Set<String> set, SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig) {
        return new SeekableStreamIndexTaskIOConfig<String, String>(Integer.valueOf(i), str, new SeekableStreamStartSequenceNumbers(STREAM, map, set), new SeekableStreamEndSequenceNumbers(STREAM, map2), true, dateTime, dateTime2, seekableStreamSupervisorIOConfig.getInputFormat()) { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.10
        };
    }

    static /* synthetic */ DataSchema access$1100() {
        return getDataSchema();
    }
}
