package org.apache.druid.indexing.seekablestream.supervisor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.File;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.CompactionTaskRunTest;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.class */
public class SeekableStreamSupervisorStateTest extends EasyMockSupport {
    private static final String DATASOURCE = "testDS";
    private static final String EXCEPTION_MSG = "I had an exception";
    private TaskStorage taskStorage;
    private TaskMaster taskMaster;
    private TaskRunner taskRunner;
    private TaskQueue taskQueue;
    private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
    private SeekableStreamIndexTaskClientFactory taskClientFactory;
    private SeekableStreamSupervisorSpec spec;
    private SeekableStreamIndexTaskClient indexTaskClient;
    private RecordSupplier<String, String> recordSupplier;
    private RowIngestionMetersFactory rowIngestionMetersFactory;
    private SupervisorStateManagerConfig supervisorConfig;
    private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
    private static final String STREAM = "stream";
    private static final String SHARD_ID = "0";
    private static final StreamPartition<String> SHARD0_PARTITION = StreamPartition.of(STREAM, SHARD_ID);

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest$TestSeekableStreamIndexTask.class */
    private class TestSeekableStreamIndexTask extends SeekableStreamIndexTask<String, String> {
        public TestSeekableStreamIndexTask(String str, @Nullable TaskResource taskResource, DataSchema dataSchema, SeekableStreamIndexTaskTuningConfig seekableStreamIndexTaskTuningConfig, SeekableStreamIndexTaskIOConfig<String, String> seekableStreamIndexTaskIOConfig, @Nullable Map<String, Object> map, @Nullable ChatHandlerProvider chatHandlerProvider, AuthorizerMapper authorizerMapper, RowIngestionMetersFactory rowIngestionMetersFactory, @Nullable String str2) {
            super(str, taskResource, dataSchema, seekableStreamIndexTaskTuningConfig, seekableStreamIndexTaskIOConfig, map, chatHandlerProvider, authorizerMapper, rowIngestionMetersFactory, str2, (AppenderatorsManager) null);
        }

        protected SeekableStreamIndexTaskRunner<String, String> createTaskRunner() {
            return null;
        }

        protected RecordSupplier<String, String> newTaskRecordSupplier() {
            return SeekableStreamSupervisorStateTest.this.recordSupplier;
        }

        public String getType() {
            return CompactionTaskRunTest.DATA_SOURCE;
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest$TestSeekableStreamSupervisor.class */
    private class TestSeekableStreamSupervisor extends SeekableStreamSupervisor<String, String> {
        private TestSeekableStreamSupervisor() {
            super("testSupervisorId", SeekableStreamSupervisorStateTest.this.taskStorage, SeekableStreamSupervisorStateTest.this.taskMaster, SeekableStreamSupervisorStateTest.this.indexerMetadataStorageCoordinator, SeekableStreamSupervisorStateTest.this.taskClientFactory, SeekableStreamSupervisorStateTest.OBJECT_MAPPER, SeekableStreamSupervisorStateTest.this.spec, SeekableStreamSupervisorStateTest.this.rowIngestionMetersFactory, false);
        }

        protected String baseTaskName() {
            return CompactionTaskRunTest.DATA_SOURCE;
        }

        protected void updateLatestSequenceFromStream(RecordSupplier<String, String> recordSupplier, Set<StreamPartition<String>> set) {
        }

        protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(int i, Map<String, String> map, Map<String, String> map2, String str, DateTime dateTime, DateTime dateTime2, Set<String> set, SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig) {
            return new SeekableStreamIndexTaskIOConfig<String, String>(Integer.valueOf(i), str, new SeekableStreamStartSequenceNumbers(SeekableStreamSupervisorStateTest.STREAM, map, set), new SeekableStreamEndSequenceNumbers(SeekableStreamSupervisorStateTest.STREAM, map2), true, dateTime, dateTime2) { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.TestSeekableStreamSupervisor.1
            };
        }

        protected List<SeekableStreamIndexTask<String, String>> createIndexTasks(int i, String str, ObjectMapper objectMapper, TreeMap<Integer, Map<String, String>> treeMap, SeekableStreamIndexTaskIOConfig seekableStreamIndexTaskIOConfig, SeekableStreamIndexTaskTuningConfig seekableStreamIndexTaskTuningConfig, RowIngestionMetersFactory rowIngestionMetersFactory) {
            return ImmutableList.of(new TestSeekableStreamIndexTask("id", null, SeekableStreamSupervisorStateTest.access$900(), seekableStreamIndexTaskTuningConfig, seekableStreamIndexTaskIOConfig, null, null, null, rowIngestionMetersFactory, null));
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public int getTaskGroupIdForPartition(String str) {
            return 0;
        }

        protected boolean checkSourceMetadataMatch(DataSourceMetadata dataSourceMetadata) {
            return true;
        }

        protected boolean doesTaskTypeMatchSupervisor(Task task) {
            return true;
        }

        protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(String str, Map<String, String> map) {
            return null;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public OrderedSequenceNumber<String> makeSequenceNumber(String str, boolean z) {
            return new OrderedSequenceNumber<String>(str, z) { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.TestSeekableStreamSupervisor.2
                public int compareTo(OrderedSequenceNumber<String> orderedSequenceNumber) {
                    return new BigInteger((String) get()).compareTo(new BigInteger((String) orderedSequenceNumber.get()));
                }
            };
        }

        protected void scheduleReporting(ScheduledExecutorService scheduledExecutorService) {
        }

        protected Map<String, String> getLagPerPartition(Map<String, String> map) {
            return null;
        }

        protected RecordSupplier<String, String> setupRecordSupplier() {
            return SeekableStreamSupervisorStateTest.this.recordSupplier;
        }

        protected SeekableStreamSupervisorReportPayload<String, String> createReportPayload(int i, boolean z) {
            return new SeekableStreamSupervisorReportPayload<String, String>(SeekableStreamSupervisorStateTest.DATASOURCE, SeekableStreamSupervisorStateTest.STREAM, 1, 1, 1L, null, null, null, null, false, true, null, null, null) { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.TestSeekableStreamSupervisor.3
            };
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: getNotSetMarker, reason: merged with bridge method [inline-methods] */
        public String m54getNotSetMarker() {
            return "NOT_SET";
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: getEndOfPartitionMarker, reason: merged with bridge method [inline-methods] */
        public String m53getEndOfPartitionMarker() {
            return "EOF";
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public boolean isEndOfShard(String str) {
            return false;
        }

        protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() {
            return false;
        }
    }

    @Before
    public void setupTest() {
        this.taskStorage = (TaskStorage) createMock(TaskStorage.class);
        this.taskMaster = (TaskMaster) createMock(TaskMaster.class);
        this.taskRunner = (TaskRunner) createMock(TaskRunner.class);
        this.taskQueue = (TaskQueue) createMock(TaskQueue.class);
        this.indexerMetadataStorageCoordinator = (IndexerMetadataStorageCoordinator) createMock(IndexerMetadataStorageCoordinator.class);
        this.taskClientFactory = (SeekableStreamIndexTaskClientFactory) createMock(SeekableStreamIndexTaskClientFactory.class);
        this.spec = (SeekableStreamSupervisorSpec) createMock(SeekableStreamSupervisorSpec.class);
        this.indexTaskClient = (SeekableStreamIndexTaskClient) createMock(SeekableStreamIndexTaskClient.class);
        this.recordSupplier = (RecordSupplier) createMock(RecordSupplier.class);
        this.rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
        this.supervisorConfig = new SupervisorStateManagerConfig();
        EasyMock.expect(this.spec.getSupervisorStateManagerConfig()).andReturn(this.supervisorConfig).anyTimes();
        EasyMock.expect(this.spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
        EasyMock.expect(this.spec.getIoConfig()).andReturn(getIOConfig()).anyTimes();
        EasyMock.expect(this.spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
        EasyMock.expect(this.taskClientFactory.build((TaskInfoProvider) EasyMock.anyObject(), EasyMock.anyString(), EasyMock.anyInt(), (Duration) EasyMock.anyObject(), EasyMock.anyLong())).andReturn(this.indexTaskClient).anyTimes();
        EasyMock.expect(this.taskMaster.getTaskRunner()).andReturn(Optional.of(this.taskRunner)).anyTimes();
        EasyMock.expect(this.taskMaster.getTaskQueue()).andReturn(Optional.of(this.taskQueue)).anyTimes();
        this.taskRunner.registerListener((TaskRunnerListener) EasyMock.anyObject(TaskRunnerListener.class), (Executor) EasyMock.anyObject(Executor.class));
        EasyMock.expect(this.indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn((Object) null).anyTimes();
        EasyMock.expect(this.recordSupplier.getAssignment()).andReturn(ImmutableSet.of(SHARD0_PARTITION)).anyTimes();
        EasyMock.expect(this.recordSupplier.getLatestSequenceNumber((StreamPartition) EasyMock.anyObject())).andReturn("10").anyTimes();
    }

    @Test
    public void testRunning() throws Exception {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
        EasyMock.expect(this.taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).anyTimes();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        verifyAll();
    }

    @Test
    public void testConnectingToStreamFail() throws Exception {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andThrow(new StreamException(new IllegalStateException(EXCEPTION_MSG))).anyTimes();
        EasyMock.expect(this.taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).anyTimes();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        List exceptionEvents = ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents();
        Assert.assertEquals(1L, exceptionEvents.size());
        Assert.assertTrue(((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent) exceptionEvents.get(0)).isStreamException());
        Assert.assertEquals(IllegalStateException.class.getName(), ((SupervisorStateManager.ExceptionEvent) exceptionEvents.get(0)).getExceptionClass());
        Assert.assertEquals(StringUtils.format("%s: %s", new Object[]{IllegalStateException.class.getName(), EXCEPTION_MSG}), ((SupervisorStateManager.ExceptionEvent) exceptionEvents.get(0)).getMessage());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(2L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        verifyAll();
    }

    @Test
    public void testConnectingToStreamFailRecoveryFailRecovery() throws Exception {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andThrow(new StreamException(new IllegalStateException())).times(3);
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).times(3);
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andThrow(new StreamException(new IllegalStateException())).times(3);
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).times(3);
        EasyMock.expect(this.taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).anyTimes();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.LOST_CONTACT_WITH_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.LOST_CONTACT_WITH_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.LOST_CONTACT_WITH_STREAM, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        verifyAll();
    }

    @Test
    public void testDiscoveringInitialTasksFailRecoveryFail() throws Exception {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
        EasyMock.expect(this.taskStorage.getActiveTasks()).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
        EasyMock.expect(this.taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).times(3);
        EasyMock.expect(this.taskStorage.getActiveTasks()).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).anyTimes();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        List exceptionEvents = ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents();
        Assert.assertEquals(1L, exceptionEvents.size());
        Assert.assertFalse(((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent) exceptionEvents.get(0)).isStreamException());
        Assert.assertEquals(IllegalStateException.class.getName(), ((SupervisorStateManager.ExceptionEvent) exceptionEvents.get(0)).getExceptionClass());
        Assert.assertEquals(EXCEPTION_MSG, ((SupervisorStateManager.ExceptionEvent) exceptionEvents.get(0)).getMessage());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(2L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        verifyAll();
    }

    @Test
    public void testCreatingTasksFailRecoveryFail() throws Exception {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
        EasyMock.expect(this.taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).times(3);
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        List exceptionEvents = ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents();
        Assert.assertEquals(1L, exceptionEvents.size());
        Assert.assertFalse(((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent) exceptionEvents.get(0)).isStreamException());
        Assert.assertEquals(IllegalStateException.class.getName(), ((SupervisorStateManager.ExceptionEvent) exceptionEvents.get(0)).getExceptionClass());
        Assert.assertEquals(EXCEPTION_MSG, ((SupervisorStateManager.ExceptionEvent) exceptionEvents.get(0)).getMessage());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(2L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(3L, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().size());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        verifyAll();
    }

    @Test
    public void testSuspended() throws Exception {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(true).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
        EasyMock.expect(this.taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).anyTimes();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.SUSPENDED, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.SUSPENDED, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.SUSPENDED, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.SUSPENDED, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        verifyAll();
    }

    @Test
    public void testStopping() throws Exception {
        EasyMock.expect(Boolean.valueOf(this.spec.isSuspended())).andReturn(false).anyTimes();
        EasyMock.expect(this.recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
        EasyMock.expect(this.taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.taskQueue.add((Task) EasyMock.anyObject()))).andReturn(true).anyTimes();
        this.taskRunner.unregisterListener("testSupervisorId");
        this.indexTaskClient.close();
        this.recordSupplier.close();
        replayAll();
        TestSeekableStreamSupervisor testSeekableStreamSupervisor = new TestSeekableStreamSupervisor();
        testSeekableStreamSupervisor.start();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.runInternal();
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isAtLeastOneSuccessfulRun());
        testSeekableStreamSupervisor.stop(false);
        Assert.assertTrue(((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.isHealthy());
        Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, ((SeekableStreamSupervisor) testSeekableStreamSupervisor).stateManager.getSupervisorState().getBasicState());
        verifyAll();
    }

    private static DataSchema getDataSchema() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(StringDimensionSchema.create("dim1"));
        arrayList.add(StringDimensionSchema.create("dim2"));
        return new DataSchema(DATASOURCE, (Map) OBJECT_MAPPER.convertValue(new StringInputRowParser(new JSONParseSpec(new TimestampSpec("timestamp", "iso", (DateTime) null), new DimensionsSpec(arrayList, (List) null, (List) null), new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of()), StandardCharsets.UTF_8.name()), Map.class), new AggregatorFactory[]{new CountAggregatorFactory("rows")}, new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, ImmutableList.of()), (TransformSpec) null, OBJECT_MAPPER);
    }

    private static SeekableStreamSupervisorIOConfig getIOConfig() {
        return new SeekableStreamSupervisorIOConfig(STREAM, 1, 1, new Period("PT1H"), new Period("P1D"), new Period("PT30S"), false, new Period("PT30M"), null, null) { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.1
        };
    }

    private static SeekableStreamSupervisorTuningConfig getTuningConfig() {
        return new SeekableStreamSupervisorTuningConfig() { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.2
            public Integer getWorkerThreads() {
                return 1;
            }

            public Integer getChatThreads() {
                return 1;
            }

            public Long getChatRetries() {
                return 1L;
            }

            public Duration getHttpTimeout() {
                return new Period("PT1M").toStandardDuration();
            }

            public Duration getShutdownTimeout() {
                return new Period("PT1S").toStandardDuration();
            }

            public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() {
                return new SeekableStreamIndexTaskTuningConfig(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest.2.1
                    /* renamed from: withBasePersistDirectory, reason: merged with bridge method [inline-methods] */
                    public SeekableStreamIndexTaskTuningConfig m52withBasePersistDirectory(File file) {
                        return null;
                    }

                    public String toString() {
                        return null;
                    }
                };
            }
        };
    }

    static /* synthetic */ DataSchema access$900() {
        return getDataSchema();
    }
}
