package org.apache.druid.indexing.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.NoopInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.AbstractFixedIntervalTask;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.common.task.TuningConfigBuilder;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProvider;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.test.TestDataSegmentAnnouncer;
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.rpc.indexing.NoopOverlordClient;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9Factory;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentKiller;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.NoopDataSegmentArchiver;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Hours;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/druid/indexing/overlord/TaskLifecycleTest.class */
public class TaskLifecycleTest extends InitializedNullHandlingTest {
    private static final String HEAP_TASK_STORAGE = "HeapMemoryTaskStorage";
    private static final String METADATA_TASK_STORAGE = "MetadataTaskStorage";
    private final String taskStorageType;
    private ObjectMapper mapper;
    private IndexSpec indexSpec;
    private QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
    private MonitorScheduler monitorScheduler;
    private ServiceEmitter emitter;
    private TaskLockConfig lockConfig;
    private TaskQueueConfig tqc;
    private TaskConfig taskConfig;
    private DataSegmentPusher dataSegmentPusher;
    private int pushedSegments;
    private int announcedSinks;
    private SegmentHandoffNotifierFactory handoffNotifierFactory;
    private Map<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks;
    private static CountDownLatch publishCountDown;
    private static final TestUtils TEST_UTILS = new TestUtils();
    private static final ObjectMapper MAPPER = TEST_UTILS.getTestObjectMapper();
    private static final IndexMergerV9Factory INDEX_MERGER_V9_FACTORY = TEST_UTILS.getIndexMergerV9Factory();
    private static final IndexIO INDEX_IO = TEST_UTILS.getTestIndexIO();
    private static final Ordering<DataSegment> BY_INTERVAL_ORDERING = new Ordering<DataSegment>() { // from class: org.apache.druid.indexing.overlord.TaskLifecycleTest.1
        public int compare(DataSegment dataSegment, DataSegment dataSegment2) {
            return Comparators.intervalsByStartThenEnd().compare(dataSegment.getInterval(), dataSegment2.getInterval());
        }
    };
    private static DateTime now = DateTimes.nowUtc();
    private static final Iterable<InputRow> REALTIME_IDX_TASK_INPUT_ROWS = ImmutableList.of(ir(now.toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 1.0f), ir(now.plus(new Period(Hours.ONE)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 2.0f), ir(now.plus(new Period(Hours.TWO)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 3.0f));
    private static final Iterable<InputRow> IDX_TASK_INPUT_ROWS = ImmutableList.of(ir("2010-01-01T01", "x", "y", 1.0f), ir("2010-01-01T01", "x", "z", 1.0f), ir("2010-01-02T01", "a", "b", 2.0f), ir("2010-01-02T01", "a", "c", 1.0f));

    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Rule
    public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
    private TaskQueryTool tsqa = null;
    private TaskStorage taskStorage = null;
    private TaskLockbox taskLockbox = null;
    private TaskQueue taskQueue = null;
    private TaskRunner taskRunner = null;
    private TestIndexerMetadataStorageCoordinator mdc = null;
    private TaskActionClientFactory tac = null;
    private TaskToolboxFactory tb = null;
    private DruidNode druidNode = new DruidNode("dummy", "dummy", false, 10000, (Integer) null, true, false);
    private TaskLocation taskLocation = TaskLocation.create(this.druidNode.getHost(), this.druidNode.getPlaintextPort(), this.druidNode.getTlsPort());

    /* loaded from: input_file:org/apache/druid/indexing/overlord/TaskLifecycleTest$MockExceptionInputSource.class */
    private static class MockExceptionInputSource extends AbstractInputSource {
        private MockExceptionInputSource() {
        }

        protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File file) {
            return new InputSourceReader() { // from class: org.apache.druid.indexing.overlord.TaskLifecycleTest.MockExceptionInputSource.1
                public CloseableIterator<InputRow> read(InputStats inputStats) {
                    return new CloseableIterator<InputRow>() { // from class: org.apache.druid.indexing.overlord.TaskLifecycleTest.MockExceptionInputSource.1.1
                        public void close() {
                        }

                        public boolean hasNext() {
                            return true;
                        }

                        /* renamed from: next, reason: merged with bridge method [inline-methods] */
                        public InputRow m89next() {
                            throw new RuntimeException("HA HA HA");
                        }
                    };
                }

                public CloseableIterator<InputRowListPlusRawValues> sample() {
                    throw new UnsupportedOperationException();
                }
            };
        }

        public boolean isSplittable() {
            return false;
        }

        public boolean needsFormat() {
            return false;
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/overlord/TaskLifecycleTest$MockInputSource.class */
    private static class MockInputSource extends AbstractInputSource {
        private MockInputSource() {
        }

        protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File file) {
            return new InputSourceReader() { // from class: org.apache.druid.indexing.overlord.TaskLifecycleTest.MockInputSource.1
                public CloseableIterator<InputRow> read(InputStats inputStats) {
                    return CloseableIterators.withEmptyBaggage(TaskLifecycleTest.IDX_TASK_INPUT_ROWS.iterator());
                }

                public CloseableIterator<InputRowListPlusRawValues> sample() {
                    throw new UnsupportedOperationException();
                }
            };
        }

        public boolean isSplittable() {
            return false;
        }

        public boolean needsFormat() {
            return false;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Parameterized.Parameters(name = "taskStorageType={0}")
    public static Collection<String[]> constructFeed() {
        return Arrays.asList(new String[]{HEAP_TASK_STORAGE}, new String[]{METADATA_TASK_STORAGE});
    }

    public TaskLifecycleTest(String str) {
        this.taskStorageType = str;
    }

    private static ServiceEmitter newMockEmitter() {
        return new NoopServiceEmitter();
    }

    private static InputRow ir(String str, String str2, String str3, float f) {
        return new MapBasedInputRow(DateTimes.of(str).getMillis(), ImmutableList.of("dim1", "dim2"), ImmutableMap.of("dim1", str2, "dim2", str3, "met", Float.valueOf(f)));
    }

    @Before
    public void setUp() throws Exception {
        this.queryRunnerFactoryConglomerate = (QueryRunnerFactoryConglomerate) EasyMock.createStrictMock(QueryRunnerFactoryConglomerate.class);
        this.monitorScheduler = (MonitorScheduler) EasyMock.createStrictMock(MonitorScheduler.class);
        this.announcedSinks = 0;
        this.pushedSegments = 0;
        this.indexSpec = IndexSpec.DEFAULT;
        this.emitter = newMockEmitter();
        EmittingLogger.registerEmitter(this.emitter);
        this.mapper = TEST_UTILS.getTestObjectMapper();
        this.handOffCallbacks = new ConcurrentHashMap();
        this.taskStorage = setUpTaskStorage();
        this.handoffNotifierFactory = setUpSegmentHandOffNotifierFactory();
        this.dataSegmentPusher = setUpDataSegmentPusher();
        this.mdc = setUpMetadataStorageCoordinator();
        this.tb = setUpTaskToolboxFactory(this.dataSegmentPusher, this.handoffNotifierFactory, this.mdc);
        this.taskRunner = setUpThreadPoolTaskRunner(this.tb);
        this.taskQueue = setUpTaskQueue(this.taskStorage, this.taskRunner);
    }

    private TaskStorage setUpTaskStorage() {
        HeapMemoryTaskStorage metadataTaskStorage;
        Preconditions.checkNotNull(this.mapper);
        Preconditions.checkNotNull(this.derbyConnectorRule);
        String str = this.taskStorageType;
        boolean z = -1;
        switch (str.hashCode()) {
            case -1955685175:
                if (str.equals(HEAP_TASK_STORAGE)) {
                    z = false;
                    break;
                }
                break;
            case 2132526695:
                if (str.equals(METADATA_TASK_STORAGE)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                metadataTaskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig((Period) null));
                break;
            case true:
                TestDerbyConnector connector = this.derbyConnectorRule.getConnector();
                this.mapper.registerSubtypes(new NamedType[]{new NamedType(MockInputSource.class, "mockInputSource"), new NamedType(NoopInputFormat.class, "noopInputFormat")});
                connector.createTaskTables();
                connector.createSegmentSchemasTable();
                connector.createSegmentTable();
                metadataTaskStorage = new MetadataTaskStorage(connector, new TaskStorageConfig((Period) null), new DerbyMetadataStorageActionHandlerFactory(connector, (MetadataStorageTablesConfig) this.derbyConnectorRule.metadataTablesConfigSupplier().get(), this.mapper));
                break;
            default:
                throw new RE("Unknown task storage type [%s]", new Object[]{this.taskStorageType});
        }
        TaskMaster taskMaster = (TaskMaster) EasyMock.createMock(TaskMaster.class);
        EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes();
        EasyMock.replay(new Object[]{taskMaster});
        this.tsqa = new TaskQueryTool(metadataTaskStorage, this.taskLockbox, taskMaster, (ProvisioningStrategy) null, (Supplier) null);
        return metadataTaskStorage;
    }

    private SegmentHandoffNotifierFactory setUpSegmentHandOffNotifierFactory() {
        Preconditions.checkNotNull(this.handOffCallbacks);
        return new SegmentHandoffNotifierFactory() { // from class: org.apache.druid.indexing.overlord.TaskLifecycleTest.2
            public SegmentHandoffNotifier createSegmentHandoffNotifier(String str, String str2) {
                return new SegmentHandoffNotifier() { // from class: org.apache.druid.indexing.overlord.TaskLifecycleTest.2.1
                    public boolean registerSegmentHandoffCallback(SegmentDescriptor segmentDescriptor, Executor executor, Runnable runnable) {
                        TaskLifecycleTest.this.handOffCallbacks.put(segmentDescriptor, new Pair<>(executor, runnable));
                        return true;
                    }

                    public void start() {
                    }

                    public void close() {
                    }
                };
            }
        };
    }

    private DataSegmentPusher setUpDataSegmentPusher() {
        return new DataSegmentPusher() { // from class: org.apache.druid.indexing.overlord.TaskLifecycleTest.3
            public String getPathForHadoop() {
                throw new UnsupportedOperationException();
            }

            @Deprecated
            public String getPathForHadoop(String str) {
                return getPathForHadoop();
            }

            public DataSegment push(File file, DataSegment dataSegment, boolean z) {
                TaskLifecycleTest.this.pushedSegments++;
                return dataSegment;
            }

            public Map<String, Object> makeLoadSpec(URI uri) {
                throw new UnsupportedOperationException();
            }
        };
    }

    private TestIndexerMetadataStorageCoordinator setUpMetadataStorageCoordinator() {
        return new TestIndexerMetadataStorageCoordinator() { // from class: org.apache.druid.indexing.overlord.TaskLifecycleTest.4
            @Override // org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator
            public Set<DataSegment> commitSegments(Set<DataSegment> set, SegmentSchemaMapping segmentSchemaMapping) {
                Set<DataSegment> commitSegments = super.commitSegments(set, segmentSchemaMapping);
                if (TaskLifecycleTest.publishCountDown != null) {
                    TaskLifecycleTest.publishCountDown.countDown();
                }
                return commitSegments;
            }
        };
    }

    private TaskToolboxFactory setUpTaskToolboxFactory(DataSegmentPusher dataSegmentPusher, SegmentHandoffNotifierFactory segmentHandoffNotifierFactory, TestIndexerMetadataStorageCoordinator testIndexerMetadataStorageCoordinator) throws IOException {
        return setUpTaskToolboxFactory(dataSegmentPusher, segmentHandoffNotifierFactory, testIndexerMetadataStorageCoordinator, new TestAppenderatorsManager());
    }

    private TaskToolboxFactory setUpTaskToolboxFactory(DataSegmentPusher dataSegmentPusher, SegmentHandoffNotifierFactory segmentHandoffNotifierFactory, TestIndexerMetadataStorageCoordinator testIndexerMetadataStorageCoordinator, AppenderatorsManager appenderatorsManager) throws IOException {
        Preconditions.checkNotNull(this.queryRunnerFactoryConglomerate);
        Preconditions.checkNotNull(this.monitorScheduler);
        Preconditions.checkNotNull(this.taskStorage);
        Preconditions.checkNotNull(this.emitter);
        this.taskLockbox = new TaskLockbox(this.taskStorage, testIndexerMetadataStorageCoordinator);
        this.tac = new LocalTaskActionClientFactory(new TaskActionToolbox(this.taskLockbox, this.taskStorage, testIndexerMetadataStorageCoordinator, this.emitter, (SupervisorManager) EasyMock.createMock(SupervisorManager.class), this.mapper));
        this.taskConfig = new TaskConfigBuilder().setBaseDir(this.temporaryFolder.newFolder().toString()).setDefaultRowFlushBoundary(50000).setTmpStorageBytesPerTask(-1L).build();
        return new TaskToolboxFactory((SegmentLoaderConfig) null, this.taskConfig, new DruidNode("druid/middlemanager", "localhost", false, 8091, (Integer) null, true, false), this.tac, this.emitter, dataSegmentPusher, new LocalDataSegmentKiller(new LocalDataSegmentPusherConfig()), (dataSegment, map) -> {
            return dataSegment;
        }, new NoopDataSegmentArchiver(), new TestDataSegmentAnnouncer() { // from class: org.apache.druid.indexing.overlord.TaskLifecycleTest.5
            @Override // org.apache.druid.indexing.test.TestDataSegmentAnnouncer
            public void announceSegment(DataSegment dataSegment2) {
                TaskLifecycleTest.this.announcedSinks++;
            }
        }, (DataSegmentServerAnnouncer) EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentHandoffNotifierFactory, () -> {
            return this.queryRunnerFactoryConglomerate;
        }, DirectQueryProcessingPool.INSTANCE, NoopJoinableFactory.INSTANCE, () -> {
            return this.monitorScheduler;
        }, new SegmentCacheManagerFactory(TestIndex.INDEX_IO, new DefaultObjectMapper()), MAPPER, INDEX_IO, MapCache.create(0L), new CacheConfig(), new CachePopulatorStats(), INDEX_MERGER_V9_FACTORY, (DruidNodeAnnouncer) EasyMock.createNiceMock(DruidNodeAnnouncer.class), (DruidNode) EasyMock.createNiceMock(DruidNode.class), new LookupNodeService("tier"), new DataNodeService("tier", 1000L, ServerType.INDEXER_EXECUTOR, 0), new NoopTestTaskReportFileWriter(), (IntermediaryDataManager) null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, new NoopChatHandlerProvider(), TEST_UTILS.getRowIngestionMetersFactory(), appenderatorsManager, new NoopOverlordClient(), new NoopCoordinatorClient(), (ParallelIndexSupervisorTaskClientProvider) null, (ShuffleClient) null, (TaskLogPusher) null, "1", CentralizedDatasourceSchemaConfig.create());
    }

    private TaskRunner setUpThreadPoolTaskRunner(TaskToolboxFactory taskToolboxFactory) {
        Preconditions.checkNotNull(this.taskConfig);
        Preconditions.checkNotNull(this.emitter);
        return new SingleTaskBackgroundRunner(taskToolboxFactory, this.taskConfig, this.emitter, this.druidNode, new ServerConfig());
    }

    private TaskQueue setUpTaskQueue(TaskStorage taskStorage, TaskRunner taskRunner) throws Exception {
        Preconditions.checkNotNull(this.taskLockbox);
        Preconditions.checkNotNull(this.tac);
        Preconditions.checkNotNull(this.emitter);
        this.lockConfig = new TaskLockConfig();
        this.tqc = (TaskQueueConfig) this.mapper.readValue("{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\", \"storageSyncRate\":\"PT0.5S\"}", TaskQueueConfig.class);
        return new TaskQueue(this.lockConfig, this.tqc, new DefaultTaskConfig(), taskStorage, taskRunner, this.tac, this.taskLockbox, this.emitter, this.mapper, new NoopTaskContextEnricher());
    }

    @After
    public void tearDown() {
        if (this.taskQueue.isActive()) {
            this.taskQueue.stop();
        }
    }

    @Test
    public void testIndexTask() {
        IndexTask indexTask = new IndexTask((String) null, (TaskResource) null, new IndexTask.IndexIngestionSpec(DataSchema.builder().withDataSource("foo").withTimestamp(new TimestampSpec((String) null, (String) null, (DateTime) null)).withDimensions(DimensionsSpec.EMPTY).withAggregators(new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}).withGranularity(new UniformGranularitySpec(Granularities.DAY, (Granularity) null, ImmutableList.of(Intervals.of("2010-01-01/P2D")))).build(), new IndexTask.IndexIOConfig(new MockInputSource(), new NoopInputFormat(), false, false), TuningConfigBuilder.forIndexTask().withMaxRowsPerSegment(10000).withMaxRowsInMemory(100).withIndexSpec(this.indexSpec).withMaxPendingPersists(3).withForceGuaranteedRollup(false).build()), (Map) null);
        Assert.assertTrue("pre run task status not present", !this.tsqa.getTaskStatus(indexTask.getId()).isPresent());
        TaskStatus runTask = runTask(indexTask);
        TaskStatus taskStatus = (TaskStatus) this.taskStorage.getStatus(indexTask.getId()).get();
        List sortedCopy = BY_INTERVAL_ORDERING.sortedCopy(this.mdc.getPublished());
        Assert.assertEquals("statusCode", TaskState.SUCCESS, taskStatus.getStatusCode());
        Assert.assertEquals(this.taskLocation, taskStatus.getLocation());
        Assert.assertEquals("merged statusCode", TaskState.SUCCESS, runTask.getStatusCode());
        Assert.assertEquals("num segments published", 2L, this.mdc.getPublished().size());
        Assert.assertEquals("num segments nuked", 0L, this.mdc.getNuked().size());
        Assert.assertEquals("segment1 datasource", "foo", ((DataSegment) sortedCopy.get(0)).getDataSource());
        Assert.assertEquals("segment1 interval", Intervals.of("2010-01-01/P1D"), ((DataSegment) sortedCopy.get(0)).getInterval());
        Assert.assertEquals("segment1 dimensions", ImmutableList.of("dim1", "dim2"), ((DataSegment) sortedCopy.get(0)).getDimensions());
        Assert.assertEquals("segment1 metrics", ImmutableList.of("met"), ((DataSegment) sortedCopy.get(0)).getMetrics());
        Assert.assertEquals("segment2 datasource", "foo", ((DataSegment) sortedCopy.get(1)).getDataSource());
        Assert.assertEquals("segment2 interval", Intervals.of("2010-01-02/P1D"), ((DataSegment) sortedCopy.get(1)).getInterval());
        Assert.assertEquals("segment2 dimensions", ImmutableList.of("dim1", "dim2"), ((DataSegment) sortedCopy.get(1)).getDimensions());
        Assert.assertEquals("segment2 metrics", ImmutableList.of("met"), ((DataSegment) sortedCopy.get(1)).getMetrics());
    }

    @Test
    public void testIndexTaskFailure() {
        TaskStatus runTask = runTask(new IndexTask((String) null, (TaskResource) null, new IndexTask.IndexIngestionSpec(DataSchema.builder().withDataSource("foo").withAggregators(new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}).withGranularity(new UniformGranularitySpec(Granularities.DAY, (Granularity) null, ImmutableList.of(Intervals.of("2010-01-01/P1D")))).withObjectMapper(this.mapper).build(), new IndexTask.IndexIOConfig(new MockExceptionInputSource(), new NoopInputFormat(), false, false), TuningConfigBuilder.forIndexTask().withMaxRowsPerSegment(10000).withMaxRowsInMemory(10).withIndexSpec(this.indexSpec).withMaxPendingPersists(3).withForceGuaranteedRollup(false).build()), (Map) null));
        Assert.assertEquals("statusCode", TaskState.FAILED, runTask.getStatusCode());
        Assert.assertEquals(this.taskLocation, runTask.getLocation());
        Assert.assertEquals("num segments published", 0L, this.mdc.getPublished().size());
        Assert.assertEquals("num segments nuked", 0L, this.mdc.getNuked().size());
    }

    @Test
    public void testKillUnusedSegmentsTask() throws Exception {
        final File newFolder = this.temporaryFolder.newFolder();
        List<DataSegment> transform = Lists.transform(ImmutableList.of("2011-04-01/2011-04-02", "2011-04-02/2011-04-03", "2011-04-04/2011-04-05"), new Function<String, DataSegment>() { // from class: org.apache.druid.indexing.overlord.TaskLifecycleTest.6
            public DataSegment apply(String str) {
                Interval of = Intervals.of(str);
                try {
                    return DataSegment.builder().dataSource("test_kill_task").interval(of).loadSpec(ImmutableMap.of("type", "local", "path", newFolder.getCanonicalPath() + "/druid/localStorage/wikipedia/" + of.getStart() + "-" + of.getEnd() + "/2011-04-6T16:52:46.119-05:00/0/index.zip")).version("2011-04-6T16:52:46.119-05:00").dimensions(ImmutableList.of()).metrics(ImmutableList.of()).shardSpec(NoneShardSpec.instance()).binaryVersion(9).size(0L).build();
                } catch (IOException e) {
                    throw new ISE(e, "Error creating segments", new Object[0]);
                }
            }
        });
        this.mdc.setUnusedSegments(transform);
        ArrayList arrayList = new ArrayList();
        Iterator<DataSegment> it = this.mdc.retrieveUnusedSegmentsForInterval("test_kill_task", Intervals.of("2011-04-01/P4D"), null, null).iterator();
        while (it.hasNext()) {
            File file = new File((String) it.next().getLoadSpec().get("path"));
            FileUtils.mkdirp(file.getParentFile());
            Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY, new OpenOption[0]);
            arrayList.add(file);
        }
        TaskStatus runTask = runTask(new KillUnusedSegmentsTask((String) null, "test_kill_task", Intervals.of("2011-04-01/P4D"), (List) null, (Map) null, (Integer) null, (Integer) null, (DateTime) null));
        Assert.assertEquals(this.taskLocation, runTask.getLocation());
        Assert.assertEquals("merged statusCode", TaskState.SUCCESS, runTask.getStatusCode());
        Assert.assertEquals("num segments published", 0L, this.mdc.getPublished().size());
        Assert.assertEquals("num segments nuked", 3L, this.mdc.getNuked().size());
        Assert.assertEquals("delete segment batch call count", 2L, this.mdc.getDeleteSegmentsCount());
        Assert.assertTrue("expected unused segments get killed", transform.containsAll(this.mdc.getNuked()) && this.mdc.getNuked().containsAll(transform));
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Assert.assertFalse("unused segments files get deleted", ((File) it2.next()).exists());
        }
    }

    @Test
    public void testKillUnusedSegmentsTaskWithMaxSegmentsToKill() throws Exception {
        final File newFolder = this.temporaryFolder.newFolder();
        List<DataSegment> transform = Lists.transform(ImmutableList.of("2011-04-01/2011-04-02", "2011-04-02/2011-04-03", "2011-04-04/2011-04-05"), new Function<String, DataSegment>() { // from class: org.apache.druid.indexing.overlord.TaskLifecycleTest.7
            public DataSegment apply(String str) {
                Interval of = Intervals.of(str);
                try {
                    return DataSegment.builder().dataSource("test_kill_task").interval(of).loadSpec(ImmutableMap.of("type", "local", "path", newFolder.getCanonicalPath() + "/druid/localStorage/wikipedia/" + of.getStart() + "-" + of.getEnd() + "/2011-04-6T16:52:46.119-05:00/0/index.zip")).version("2011-04-6T16:52:46.119-05:00").dimensions(ImmutableList.of()).metrics(ImmutableList.of()).shardSpec(NoneShardSpec.instance()).binaryVersion(9).size(0L).build();
                } catch (IOException e) {
                    throw new ISE(e, "Error creating segments", new Object[0]);
                }
            }
        });
        this.mdc.setUnusedSegments(transform);
        ArrayList arrayList = new ArrayList();
        Iterator<DataSegment> it = this.mdc.retrieveUnusedSegmentsForInterval("test_kill_task", Intervals.of("2011-04-01/P4D"), null, null).iterator();
        while (it.hasNext()) {
            File file = new File((String) it.next().getLoadSpec().get("path"));
            FileUtils.mkdirp(file.getParentFile());
            Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY, new OpenOption[0]);
            arrayList.add(file);
        }
        TaskStatus runTask = runTask(new KillUnusedSegmentsTask((String) null, "test_kill_task", Intervals.of("2011-04-01/P4D"), (List) null, (Map) null, (Integer) null, 2, (DateTime) null));
        Assert.assertEquals(this.taskLocation, runTask.getLocation());
        Assert.assertEquals("merged statusCode", TaskState.SUCCESS, runTask.getStatusCode());
        Assert.assertEquals("num segments published", 0L, this.mdc.getPublished().size());
        Assert.assertEquals("num segments nuked", 2L, this.mdc.getNuked().size());
        Assert.assertTrue("expected unused segments get killed", transform.containsAll(this.mdc.getNuked()));
        int size = arrayList.size() - 2;
        int i = 0;
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            if (((File) it2.next()).exists()) {
                i++;
            }
        }
        Assert.assertEquals("Expected of segments deleted did not match expectations", size, i);
    }

    @Test
    public void testRealtimeishTask() {
        TaskStatus runTask = runTask(new RealtimeishTask());
        Assert.assertEquals("statusCode", TaskState.SUCCESS, runTask.getStatusCode());
        Assert.assertEquals(this.taskLocation, runTask.getLocation());
        Assert.assertEquals("num segments published", 2L, this.mdc.getPublished().size());
        Assert.assertEquals("num segments nuked", 0L, this.mdc.getNuked().size());
    }

    @Test
    public void testNoopTask() {
        TaskStatus runTask = runTask(NoopTask.create());
        Assert.assertEquals("statusCode", TaskState.SUCCESS, runTask.getStatusCode());
        Assert.assertEquals(this.taskLocation, runTask.getLocation());
        Assert.assertEquals("num segments published", 0L, this.mdc.getPublished().size());
        Assert.assertEquals("num segments nuked", 0L, this.mdc.getNuked().size());
    }

    @Test
    public void testNeverReadyTask() {
        TaskStatus runTask = runTask(new NoopTask(null, null, null, 0L, 0L, null) { // from class: org.apache.druid.indexing.overlord.TaskLifecycleTest.8
            public boolean isReady(TaskActionClient taskActionClient) {
                throw new ISE("Task will never be ready", new Object[0]);
            }
        });
        Assert.assertEquals("statusCode", TaskState.FAILED, runTask.getStatusCode());
        Assert.assertEquals(this.taskLocation, runTask.getLocation());
        Assert.assertEquals("num segments published", 0L, this.mdc.getPublished().size());
        Assert.assertEquals("num segments nuked", 0L, this.mdc.getNuked().size());
    }

    @Test
    public void testSimple() {
        TaskStatus runTask = runTask(new AbstractFixedIntervalTask("id1", "id1", new TaskResource("id1", 1), "ds", Intervals.of("2012-01-01/P1D"), null) { // from class: org.apache.druid.indexing.overlord.TaskLifecycleTest.9
            public String getType() {
                return "test";
            }

            public void stopGracefully(TaskConfig taskConfig) {
            }

            public TaskStatus runTask(TaskToolbox taskToolbox) throws Exception {
                Interval of = Intervals.of("2012-01-01/P1D");
                TaskLock taskLock = (TaskLock) taskToolbox.getTaskActionClient().submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, of));
                if (taskLock == null) {
                    throw new ISE("Failed to get a lock", new Object[0]);
                }
                taskToolbox.getTaskActionClient().submit(SegmentTransactionalInsertAction.appendAction(ImmutableSet.of(DataSegment.builder().dataSource("ds").interval(of).version(taskLock.getVersion()).size(0L).build()), (DataSourceMetadata) null, (DataSourceMetadata) null, (SegmentSchemaMapping) null));
                return TaskStatus.success(getId());
            }
        });
        Assert.assertEquals(this.taskLocation, runTask.getLocation());
        Assert.assertEquals("statusCode", TaskState.SUCCESS, runTask.getStatusCode());
        Assert.assertEquals("segments published", 1L, this.mdc.getPublished().size());
        Assert.assertEquals("segments nuked", 0L, this.mdc.getNuked().size());
    }

    @Test
    public void testBadInterval() {
        TaskStatus runTask = runTask(new AbstractFixedIntervalTask("id1", "id1", "ds", Intervals.of("2012-01-01/P1D"), null) { // from class: org.apache.druid.indexing.overlord.TaskLifecycleTest.10
            public String getType() {
                return "test";
            }

            public void stopGracefully(TaskConfig taskConfig) {
            }

            public TaskStatus runTask(TaskToolbox taskToolbox) throws Exception {
                taskToolbox.getTaskActionClient().submit(SegmentTransactionalInsertAction.appendAction(ImmutableSet.of(DataSegment.builder().dataSource("ds").interval(Intervals.of("2012-01-01/P2D")).version(((TaskLock) Iterables.getOnlyElement((Iterable) taskToolbox.getTaskActionClient().submit(new LockListAction()))).getVersion()).size(0L).build()), (DataSourceMetadata) null, (DataSourceMetadata) null, (SegmentSchemaMapping) null));
                return TaskStatus.success(getId());
            }
        });
        Assert.assertEquals("statusCode", TaskState.FAILED, runTask.getStatusCode());
        Assert.assertEquals(this.taskLocation, runTask.getLocation());
        Assert.assertEquals("segments published", 0L, this.mdc.getPublished().size());
        Assert.assertEquals("segments nuked", 0L, this.mdc.getNuked().size());
    }

    @Test
    public void testBadVersion() {
        TaskStatus runTask = runTask(new AbstractFixedIntervalTask("id1", "id1", "ds", Intervals.of("2012-01-01/P1D"), null) { // from class: org.apache.druid.indexing.overlord.TaskLifecycleTest.11
            public String getType() {
                return "test";
            }

            public void stopGracefully(TaskConfig taskConfig) {
            }

            public TaskStatus runTask(TaskToolbox taskToolbox) throws Exception {
                taskToolbox.getTaskActionClient().submit(SegmentTransactionalInsertAction.appendAction(ImmutableSet.of(DataSegment.builder().dataSource("ds").interval(Intervals.of("2012-01-01/P1D")).version(((TaskLock) Iterables.getOnlyElement((Iterable) taskToolbox.getTaskActionClient().submit(new LockListAction()))).getVersion() + "1!!!1!!").size(0L).build()), (DataSourceMetadata) null, (DataSourceMetadata) null, (SegmentSchemaMapping) null));
                return TaskStatus.success(getId());
            }
        });
        Assert.assertEquals("statusCode", TaskState.FAILED, runTask.getStatusCode());
        Assert.assertEquals(this.taskLocation, runTask.getLocation());
        Assert.assertEquals("segments published", 0L, this.mdc.getPublished().size());
        Assert.assertEquals("segments nuked", 0L, this.mdc.getNuked().size());
    }

    @Test
    public void testResumeTasks() throws Exception {
        IndexTask indexTask = new IndexTask((String) null, (TaskResource) null, new IndexTask.IndexIngestionSpec(DataSchema.builder().withDataSource("foo").withTimestamp(new TimestampSpec((String) null, (String) null, (DateTime) null)).withDimensions(DimensionsSpec.EMPTY).withAggregators(new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}).withGranularity(new UniformGranularitySpec(Granularities.DAY, (Granularity) null, ImmutableList.of(Intervals.of("2010-01-01/P2D")))).build(), new IndexTask.IndexIOConfig(new MockInputSource(), new NoopInputFormat(), false, false), TuningConfigBuilder.forIndexTask().withMaxRowsPerSegment(10000).withMaxRowsInMemory(10).withIndexSpec(this.indexSpec).build()), (Map) null);
        long currentTimeMillis = System.currentTimeMillis();
        this.taskQueue.start();
        this.taskStorage.insert(indexTask, TaskStatus.running(indexTask.getId()));
        while (((TaskStatus) this.tsqa.getTaskStatus(indexTask.getId()).get()).isRunnable()) {
            if (System.currentTimeMillis() > currentTimeMillis + 10000) {
                throw new ISE("Where did the task go?!: %s", new Object[]{indexTask.getId()});
            }
            Thread.sleep(100L);
        }
        TaskStatus taskStatus = (TaskStatus) this.taskStorage.getStatus(indexTask.getId()).get();
        List sortedCopy = BY_INTERVAL_ORDERING.sortedCopy(this.mdc.getPublished());
        Assert.assertEquals("statusCode", TaskState.SUCCESS, taskStatus.getStatusCode());
        Assert.assertEquals(this.taskLocation, taskStatus.getLocation());
        Assert.assertEquals("num segments published", 2L, this.mdc.getPublished().size());
        Assert.assertEquals("num segments nuked", 0L, this.mdc.getNuked().size());
        Assert.assertEquals("segment1 datasource", "foo", ((DataSegment) sortedCopy.get(0)).getDataSource());
        Assert.assertEquals("segment1 interval", Intervals.of("2010-01-01/P1D"), ((DataSegment) sortedCopy.get(0)).getInterval());
        Assert.assertEquals("segment1 dimensions", ImmutableList.of("dim1", "dim2"), ((DataSegment) sortedCopy.get(0)).getDimensions());
        Assert.assertEquals("segment1 metrics", ImmutableList.of("met"), ((DataSegment) sortedCopy.get(0)).getMetrics());
        Assert.assertEquals("segment2 datasource", "foo", ((DataSegment) sortedCopy.get(1)).getDataSource());
        Assert.assertEquals("segment2 interval", Intervals.of("2010-01-02/P1D"), ((DataSegment) sortedCopy.get(1)).getInterval());
        Assert.assertEquals("segment2 dimensions", ImmutableList.of("dim1", "dim2"), ((DataSegment) sortedCopy.get(1)).getDimensions());
        Assert.assertEquals("segment2 metrics", ImmutableList.of("met"), ((DataSegment) sortedCopy.get(1)).getMetrics());
    }

    @Test
    public void testUnifiedAppenderatorsManagerCleanup() throws Exception {
        UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = new UnifiedIndexerAppenderatorsManager(new ForwardingQueryProcessingPool(Execs.multiThreaded(8, "TaskLifecycleTest-%d")), JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, new WorkerConfig(), MapCache.create(2048L), new CacheConfig(), new CachePopulatorStats(), MAPPER, new NoopServiceEmitter(), () -> {
            return this.queryRunnerFactoryConglomerate;
        });
        this.tb = setUpTaskToolboxFactory(this.dataSegmentPusher, this.handoffNotifierFactory, this.mdc, unifiedIndexerAppenderatorsManager);
        this.taskRunner = setUpThreadPoolTaskRunner(this.tb);
        this.taskQueue = setUpTaskQueue(this.taskStorage, this.taskRunner);
        IndexTask indexTask = new IndexTask((String) null, (TaskResource) null, new IndexTask.IndexIngestionSpec(DataSchema.builder().withDataSource("foo").withTimestamp(new TimestampSpec((String) null, (String) null, (DateTime) null)).withDimensions(DimensionsSpec.EMPTY).withAggregators(new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}).withGranularity(new UniformGranularitySpec(Granularities.DAY, (Granularity) null, ImmutableList.of(Intervals.of("2010-01-01/P2D")))).build(), new IndexTask.IndexIOConfig(new MockInputSource(), new NoopInputFormat(), false, false), TuningConfigBuilder.forIndexTask().withMaxRowsPerSegment(10000).withMaxRowsInMemory(10).withIndexSpec(this.indexSpec).withMaxPendingPersists(3).withForceGuaranteedRollup(false).build()), (Map) null);
        Assert.assertTrue("pre run task status not present", !this.tsqa.getTaskStatus(indexTask.getId()).isPresent());
        runTask(indexTask);
        Assert.assertEquals("statusCode", TaskState.SUCCESS, ((TaskStatus) this.taskStorage.getStatus(indexTask.getId()).get()).getStatusCode());
        Map datasourceBundles = unifiedIndexerAppenderatorsManager.getDatasourceBundles();
        Assert.assertEquals(1L, datasourceBundles.size());
        unifiedIndexerAppenderatorsManager.removeAppenderatorsForTask(indexTask.getId(), "foo");
        Assert.assertTrue(datasourceBundles.isEmpty());
    }

    @Test
    public void testLockRevoked() {
        TaskStatus runTask = runTask(new AbstractFixedIntervalTask("id1", "id1", new TaskResource("id1", 1), "ds", Intervals.of("2012-01-01/P1D"), null) { // from class: org.apache.druid.indexing.overlord.TaskLifecycleTest.12
            public String getType() {
                return "test";
            }

            public void stopGracefully(TaskConfig taskConfig) {
            }

            public TaskStatus runTask(TaskToolbox taskToolbox) throws Exception {
                TimeChunkLockTryAcquireAction timeChunkLockTryAcquireAction = new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, Intervals.of("2012-01-01/P1D"));
                TaskLock taskLock = (TaskLock) taskToolbox.getTaskActionClient().submit(timeChunkLockTryAcquireAction);
                if (taskLock == null) {
                    throw new ISE("Failed to get a lock", new Object[0]);
                }
                Assert.assertFalse(((TaskLock) taskToolbox.getTaskActionClient().submit(timeChunkLockTryAcquireAction)).isRevoked());
                TaskLifecycleTest.this.taskLockbox.revokeLock(getId(), taskLock);
                Assert.assertTrue(((TaskLock) taskToolbox.getTaskActionClient().submit(timeChunkLockTryAcquireAction)).isRevoked());
                return TaskStatus.failure(getId(), "lock revoked test");
            }
        });
        Assert.assertEquals(this.taskLocation, runTask.getLocation());
        Assert.assertEquals("statusCode", TaskState.FAILED, runTask.getStatusCode());
        Assert.assertEquals("segments published", 0L, this.mdc.getPublished().size());
        Assert.assertEquals("segments nuked", 0L, this.mdc.getNuked().size());
    }

    private TaskStatus runTask(Task task) {
        Stopwatch createStarted = Stopwatch.createStarted();
        synchronized (this) {
            if (!this.taskQueue.isActive()) {
                this.taskQueue.start();
            }
        }
        this.taskQueue.add(task);
        String id = task.getId();
        TaskStatus taskStatus = null;
        while (true) {
            try {
                TaskStatus taskStatus2 = (TaskStatus) this.tsqa.getTaskStatus(id).get();
                if (!taskStatus2.isRunnable()) {
                    if (id.equals(task.getId())) {
                        taskStatus = taskStatus2;
                    }
                    return taskStatus;
                }
                if (createStarted.millisElapsed() > 10000) {
                    throw new ISE("Where did the task go?!: %s", new Object[]{task.getId()});
                }
                Thread.sleep(100L);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
}
