package org.apache.druid.indexing.common.task;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.FirehoseFactoryV2;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestFirehose;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.test.TestDataSegmentAnnouncer;
import org.apache.druid.indexing.test.TestDataSegmentKiller;
import org.apache.druid.indexing.test.TestDataSegmentPusher;
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.Druids;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.plumber.PlumberSchool;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory;
import org.apache.druid.segment.realtime.plumber.VersioningPolicy;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.class */
public class RealtimeIndexTaskTest {
    private static final Logger log = new Logger(RealtimeIndexTaskTest.class);
    private static final ServiceEmitter emitter = new ServiceEmitter("service", "host", new NoopEmitter());

    @Rule
    public final ExpectedException expectedException = ExpectedException.none();

    @Rule
    public final TemporaryFolder tempFolder = new TemporaryFolder();
    private DateTime now;
    private ListeningExecutorService taskExec;
    private Map<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks;

    @Before
    public void setUp() {
        EmittingLogger.registerEmitter(emitter);
        emitter.start();
        this.taskExec = MoreExecutors.listeningDecorator(Execs.singleThreaded("realtime-index-task-test-%d"));
        this.now = DateTimes.nowUtc();
    }

    @After
    public void tearDown() {
        this.taskExec.shutdownNow();
    }

    @Test
    public void testMakeTaskId() {
        Assert.assertEquals("index_realtime_test_0_2015-01-02T00:00:00.000Z_abcdefgh", RealtimeIndexTask.makeTaskId(IngestionTestBase.DATA_SOURCE, 0, DateTimes.of("2015-01-02"), "abcdefgh"));
    }

    @Test(timeout = 60000)
    public void testDefaultResource() {
        RealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null);
        Assert.assertEquals(makeRealtimeTask.getId(), makeRealtimeTask.getTaskResource().getAvailabilityGroup());
    }

    @Test(timeout = 60000, expected = ExecutionException.class)
    public void testHandoffTimeout() throws Exception {
        TestIndexerMetadataStorageCoordinator testIndexerMetadataStorageCoordinator = new TestIndexerMetadataStorageCoordinator();
        RealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null, TransformSpec.NONE, true, 100L);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask, makeToolbox(makeRealtimeTask, testIndexerMetadataStorageCoordinator, this.tempFolder.newFolder()));
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask.getFirehose();
        testFirehose.addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo", "met1", "1")));
        testFirehose.close();
        while (testIndexerMetadataStorageCoordinator.getPublished().isEmpty()) {
            Thread.sleep(50L);
        }
        Assert.assertEquals(1L, makeRealtimeTask.getMetrics().processed());
        Assert.assertNotNull(Iterables.getOnlyElement(testIndexerMetadataStorageCoordinator.getPublished()));
        runTask.get();
    }

    @Test(timeout = 60000)
    public void testBasics() throws Exception {
        TestIndexerMetadataStorageCoordinator testIndexerMetadataStorageCoordinator = new TestIndexerMetadataStorageCoordinator();
        RealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask, makeToolbox(makeRealtimeTask, testIndexerMetadataStorageCoordinator, this.tempFolder.newFolder()));
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask.getFirehose();
        testFirehose.addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo", "met1", "1"), ImmutableMap.of("t", Long.valueOf(this.now.minus(new Period("P1D")).getMillis()), "dim1", "foo", "met1", Double.valueOf(2.0d)), ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim2", "bar", "met1", Double.valueOf(2.0d))));
        testFirehose.close();
        while (testIndexerMetadataStorageCoordinator.getPublished().isEmpty()) {
            Thread.sleep(50L);
        }
        DataSegment dataSegment = (DataSegment) Iterables.getOnlyElement(testIndexerMetadataStorageCoordinator.getPublished());
        Assert.assertEquals(2L, makeRealtimeTask.getMetrics().processed());
        Assert.assertEquals(1L, makeRealtimeTask.getMetrics().thrownAway());
        Assert.assertEquals(0L, makeRealtimeTask.getMetrics().unparseable());
        Assert.assertEquals(2L, sumMetric(makeRealtimeTask, null, "rows").longValue());
        Assert.assertEquals(3L, sumMetric(makeRealtimeTask, null, "met1").longValue());
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> value = entry.getValue();
            Assert.assertEquals(new SegmentDescriptor(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().getPartitionNum()), entry.getKey());
            ((Executor) value.lhs).execute((Runnable) value.rhs);
        }
        this.handOffCallbacks.clear();
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask.get()).getStatusCode());
    }

    @Test(timeout = 60000)
    public void testTransformSpec() throws Exception {
        TestIndexerMetadataStorageCoordinator testIndexerMetadataStorageCoordinator = new TestIndexerMetadataStorageCoordinator();
        RealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null, new TransformSpec(new SelectorDimFilter("dim1", "foo", (ExtractionFn) null), ImmutableList.of(new ExpressionTransform("dim1t", "concat(dim1,dim1)", ExprMacroTable.nil()))), true, 0L);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask, makeToolbox(makeRealtimeTask, testIndexerMetadataStorageCoordinator, this.tempFolder.newFolder()));
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask.getFirehose();
        testFirehose.addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo", "met1", "1"), ImmutableMap.of("t", Long.valueOf(this.now.minus(new Period("P1D")).getMillis()), "dim1", "foo", "met1", Double.valueOf(2.0d)), ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim2", "bar", "met1", Double.valueOf(2.0d))));
        testFirehose.close();
        while (testIndexerMetadataStorageCoordinator.getPublished().isEmpty()) {
            Thread.sleep(50L);
        }
        DataSegment dataSegment = (DataSegment) Iterables.getOnlyElement(testIndexerMetadataStorageCoordinator.getPublished());
        Assert.assertEquals(1L, makeRealtimeTask.getMetrics().processed());
        Assert.assertEquals(2L, makeRealtimeTask.getMetrics().thrownAway());
        Assert.assertEquals(0L, makeRealtimeTask.getMetrics().unparseable());
        Assert.assertEquals(1L, sumMetric(makeRealtimeTask, null, "rows").longValue());
        Assert.assertEquals(1L, sumMetric(makeRealtimeTask, new SelectorDimFilter("dim1t", "foofoo", (ExtractionFn) null), "rows").longValue());
        if (NullHandling.replaceWithDefault()) {
            Assert.assertEquals(0L, sumMetric(makeRealtimeTask, new SelectorDimFilter("dim1t", "barbar", (ExtractionFn) null), "rows").longValue());
        } else {
            Assert.assertNull(sumMetric(makeRealtimeTask, new SelectorDimFilter("dim1t", "barbar", (ExtractionFn) null), "rows"));
        }
        Assert.assertEquals(1L, sumMetric(makeRealtimeTask, null, "met1").longValue());
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> value = entry.getValue();
            Assert.assertEquals(new SegmentDescriptor(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().getPartitionNum()), entry.getKey());
            ((Executor) value.lhs).execute((Runnable) value.rhs);
        }
        this.handOffCallbacks.clear();
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask.get()).getStatusCode());
    }

    @Test(timeout = 60000)
    public void testReportParseExceptionsOnBadMetric() throws Exception {
        TestIndexerMetadataStorageCoordinator testIndexerMetadataStorageCoordinator = new TestIndexerMetadataStorageCoordinator();
        RealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null, true);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask, makeToolbox(makeRealtimeTask, testIndexerMetadataStorageCoordinator, this.tempFolder.newFolder()));
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask.getFirehose();
        testFirehose.addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo", "met1", "1"), ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo", "met1", "foo"), ImmutableMap.of("t", Long.valueOf(this.now.minus(new Period("P1D")).getMillis()), "dim1", "foo", "met1", "foo"), ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim2", "bar", "met1", Double.valueOf(2.0d))));
        testFirehose.close();
        this.expectedException.expect(ExecutionException.class);
        this.expectedException.expectCause(CoreMatchers.instanceOf(ParseException.class));
        this.expectedException.expectCause(ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("[Unable to parse value[foo] for field[met1]")));
        runTask.get();
    }

    @Test(timeout = 60000)
    public void testNoReportParseExceptions() throws Exception {
        TestIndexerMetadataStorageCoordinator testIndexerMetadataStorageCoordinator = new TestIndexerMetadataStorageCoordinator();
        RealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null, false);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask, makeToolbox(makeRealtimeTask, testIndexerMetadataStorageCoordinator, this.tempFolder.newFolder()));
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask.getFirehose();
        testFirehose.addRows(Arrays.asList(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo", "met1", "1"), null, ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo", "met1", "foo"), ImmutableMap.of("dim1", "foo", "met1", Double.valueOf(2.0d), TestFirehose.FAIL_DIM, "x"), ImmutableMap.of("t", Long.valueOf(this.now.minus(Period.days(1)).getMillis()), "dim1", "foo", "met1", Double.valueOf(2.0d)), ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim2", "bar", "met1", Double.valueOf(2.0d))));
        testFirehose.close();
        while (testIndexerMetadataStorageCoordinator.getPublished().isEmpty()) {
            Thread.sleep(50L);
        }
        DataSegment dataSegment = (DataSegment) Iterables.getOnlyElement(testIndexerMetadataStorageCoordinator.getPublished());
        Assert.assertEquals(3L, makeRealtimeTask.getMetrics().processed());
        Assert.assertEquals(1L, makeRealtimeTask.getMetrics().thrownAway());
        Assert.assertEquals(2L, makeRealtimeTask.getMetrics().unparseable());
        Assert.assertEquals(3L, sumMetric(makeRealtimeTask, null, "rows").longValue());
        Assert.assertEquals(3L, sumMetric(makeRealtimeTask, null, "met1").longValue());
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> value = entry.getValue();
            Assert.assertEquals(new SegmentDescriptor(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().getPartitionNum()), entry.getKey());
            ((Executor) value.lhs).execute((Runnable) value.rhs);
        }
        this.handOffCallbacks.clear();
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask.get()).getStatusCode());
    }

    @Test(timeout = 60000)
    public void testRestore() throws Exception {
        File newFolder = this.tempFolder.newFolder();
        RealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null);
        TestIndexerMetadataStorageCoordinator testIndexerMetadataStorageCoordinator = new TestIndexerMetadataStorageCoordinator();
        TaskToolbox makeToolbox = makeToolbox(makeRealtimeTask, testIndexerMetadataStorageCoordinator, newFolder);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask, makeToolbox);
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        ((TestFirehose) makeRealtimeTask.getFirehose()).addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo")));
        makeRealtimeTask.stopGracefully(makeToolbox.getConfig());
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask.get()).getStatusCode());
        Assert.assertEquals(new HashSet(), testIndexerMetadataStorageCoordinator.getPublished());
        TestIndexerMetadataStorageCoordinator testIndexerMetadataStorageCoordinator2 = new TestIndexerMetadataStorageCoordinator();
        RealtimeIndexTask makeRealtimeTask2 = makeRealtimeTask(makeRealtimeTask.getId());
        ListenableFuture<TaskStatus> runTask2 = runTask(makeRealtimeTask2, makeToolbox(makeRealtimeTask2, testIndexerMetadataStorageCoordinator2, newFolder));
        while (makeRealtimeTask2.getFirehose() == null) {
            Thread.sleep(50L);
        }
        Assert.assertEquals(1L, sumMetric(makeRealtimeTask2, null, "rows").longValue());
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask2.getFirehose();
        testFirehose.addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim2", "bar")));
        testFirehose.close();
        while (testIndexerMetadataStorageCoordinator2.getPublished().isEmpty()) {
            Thread.sleep(50L);
        }
        DataSegment dataSegment = (DataSegment) Iterables.getOnlyElement(testIndexerMetadataStorageCoordinator2.getPublished());
        Assert.assertEquals(2L, sumMetric(makeRealtimeTask2, null, "rows").longValue());
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> value = entry.getValue();
            Assert.assertEquals(new SegmentDescriptor(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().getPartitionNum()), entry.getKey());
            ((Executor) value.lhs).execute((Runnable) value.rhs);
        }
        this.handOffCallbacks.clear();
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask2.get()).getStatusCode());
    }

    @Test(timeout = 60000)
    public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception {
        TaskStorage heapMemoryTaskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig((Period) null));
        TestIndexerMetadataStorageCoordinator testIndexerMetadataStorageCoordinator = new TestIndexerMetadataStorageCoordinator();
        File newFolder = this.tempFolder.newFolder();
        RealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null);
        TaskToolbox makeToolbox = makeToolbox(makeRealtimeTask, heapMemoryTaskStorage, testIndexerMetadataStorageCoordinator, newFolder);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask, makeToolbox);
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask.getFirehose();
        testFirehose.addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo")));
        testFirehose.close();
        while (testIndexerMetadataStorageCoordinator.getPublished().isEmpty()) {
            Thread.sleep(50L);
        }
        DataSegment dataSegment = (DataSegment) Iterables.getOnlyElement(testIndexerMetadataStorageCoordinator.getPublished());
        Assert.assertEquals(1L, sumMetric(makeRealtimeTask, null, "rows").longValue());
        makeRealtimeTask.stopGracefully(makeToolbox.getConfig());
        while (!runTask.isDone()) {
            Thread.sleep(50L);
        }
        RealtimeIndexTask makeRealtimeTask2 = makeRealtimeTask(makeRealtimeTask.getId());
        ListenableFuture<TaskStatus> runTask2 = runTask(makeRealtimeTask2, makeToolbox(makeRealtimeTask2, heapMemoryTaskStorage, testIndexerMetadataStorageCoordinator, newFolder));
        while (makeRealtimeTask2.getFirehose() == null) {
            Thread.sleep(50L);
        }
        ((TestFirehose) makeRealtimeTask2.getFirehose()).close();
        Assert.assertEquals(ImmutableSet.of(dataSegment), testIndexerMetadataStorageCoordinator.getPublished());
        while (this.handOffCallbacks.isEmpty()) {
            Thread.sleep(50L);
        }
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> value = entry.getValue();
            Assert.assertEquals(new SegmentDescriptor(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().getPartitionNum()), entry.getKey());
            ((Executor) value.lhs).execute((Runnable) value.rhs);
        }
        this.handOffCallbacks.clear();
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask2.get()).getStatusCode());
    }

    @Test(timeout = 60000)
    public void testRestoreCorruptData() throws Exception {
        File newFolder = this.tempFolder.newFolder();
        RealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null);
        TestIndexerMetadataStorageCoordinator testIndexerMetadataStorageCoordinator = new TestIndexerMetadataStorageCoordinator();
        TaskToolbox makeToolbox = makeToolbox(makeRealtimeTask, testIndexerMetadataStorageCoordinator, newFolder);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask, makeToolbox);
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        ((TestFirehose) makeRealtimeTask.getFirehose()).addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo")));
        makeRealtimeTask.stopGracefully(makeToolbox.getConfig());
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask.get()).getStatusCode());
        Assert.assertEquals(new HashSet(), testIndexerMetadataStorageCoordinator.getPublished());
        Files.write(new File(StringUtils.format("%s/persistent/task/%s/work/persist/%s/%s_%s/0/00000.smoosh", new Object[]{newFolder, makeRealtimeTask.getId(), makeRealtimeTask.getDataSource(), Granularities.DAY.bucketStart(this.now), Granularities.DAY.bucketEnd(this.now)})).toPath(), StringUtils.toUtf8("oops!"), new OpenOption[0]);
        IndexerMetadataStorageCoordinator testIndexerMetadataStorageCoordinator2 = new TestIndexerMetadataStorageCoordinator();
        RealtimeIndexTask makeRealtimeTask2 = makeRealtimeTask(makeRealtimeTask.getId());
        boolean z = false;
        try {
            runTask(makeRealtimeTask2, makeToolbox(makeRealtimeTask2, testIndexerMetadataStorageCoordinator2, newFolder)).get();
        } catch (Exception e) {
            z = true;
        }
        Assert.assertTrue("expected exception", z);
    }

    @Test(timeout = 60000)
    public void testStopBeforeStarting() throws Exception {
        File newFolder = this.tempFolder.newFolder();
        RealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null);
        TaskToolbox makeToolbox = makeToolbox(makeRealtimeTask, new TestIndexerMetadataStorageCoordinator(), newFolder);
        makeRealtimeTask.stopGracefully(makeToolbox.getConfig());
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask(makeRealtimeTask, makeToolbox).get()).getStatusCode());
    }

    private ListenableFuture<TaskStatus> runTask(final Task task, final TaskToolbox taskToolbox) {
        return this.taskExec.submit(new Callable<TaskStatus>() { // from class: org.apache.druid.indexing.common.task.RealtimeIndexTaskTest.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public TaskStatus call() throws Exception {
                try {
                    if (task.isReady(taskToolbox.getTaskActionClient())) {
                        return task.run(taskToolbox);
                    }
                    throw new ISE("Task is not ready", new Object[0]);
                } catch (Exception e) {
                    RealtimeIndexTaskTest.log.warn(e, "Task failed", new Object[0]);
                    throw e;
                }
            }
        });
    }

    private RealtimeIndexTask makeRealtimeTask(String str) {
        return makeRealtimeTask(str, TransformSpec.NONE, true, 0L);
    }

    private RealtimeIndexTask makeRealtimeTask(String str, boolean z) {
        return makeRealtimeTask(str, TransformSpec.NONE, z, 0L);
    }

    private RealtimeIndexTask makeRealtimeTask(String str, TransformSpec transformSpec, boolean z, long j) {
        return new RealtimeIndexTask(str, null, new FireDepartment(new DataSchema("test_ds", (Map) TestHelper.makeJsonMapper().convertValue(new MapInputRowParser(new TimeAndDimsParseSpec(new TimestampSpec("t", "auto", (DateTime) null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim1t")), (List) null, (List) null))), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT), new AggregatorFactory[]{new CountAggregatorFactory("rows"), new LongSumAggregatorFactory("met1", "met1")}, new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, (List) null), transformSpec, new DefaultObjectMapper()), new RealtimeIOConfig(new TestFirehose.TestFirehoseFactory(), (PlumberSchool) null, (FirehoseFactoryV2) null), new RealtimeTuningConfig(1000, (Long) null, new Period("P1Y"), new Period("PT10M"), (File) null, (VersioningPolicy) null, new ServerTimeRejectionPolicyFactory(), (Integer) null, (ShardSpec) null, (IndexSpec) null, true, 0, 0, Boolean.valueOf(z), Long.valueOf(j), (Long) null, (SegmentWriteOutMediumFactory) null, (String) null)), null) { // from class: org.apache.druid.indexing.common.task.RealtimeIndexTaskTest.2
            protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) {
                return true;
            }
        };
    }

    private TaskToolbox makeToolbox(Task task, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, File file) {
        return makeToolbox(task, new HeapMemoryTaskStorage(new TaskStorageConfig((Period) null)), indexerMetadataStorageCoordinator, file);
    }

    private TaskToolbox makeToolbox(Task task, TaskStorage taskStorage, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, File file) {
        TaskConfig taskConfig = new TaskConfig(file.getPath(), (String) null, (String) null, 50000, (List) null, true, (Period) null, (Period) null);
        TaskLockbox taskLockbox = new TaskLockbox(taskStorage);
        try {
            taskStorage.insert(task, TaskStatus.running(task.getId()));
        } catch (EntryExistsException e) {
        }
        taskLockbox.syncFromStorage();
        LocalTaskActionClientFactory localTaskActionClientFactory = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, taskStorage, indexerMetadataStorageCoordinator, emitter, (SupervisorManager) EasyMock.createMock(SupervisorManager.class)), new TaskAuditLogConfig(false));
        DefaultQueryRunnerFactoryConglomerate defaultQueryRunnerFactoryConglomerate = new DefaultQueryRunnerFactoryConglomerate(ImmutableMap.of(TimeseriesQuery.class, new TimeseriesQueryRunnerFactory(new TimeseriesQueryQueryToolChest(new IntervalChunkingQueryRunnerDecorator(null, null, null) { // from class: org.apache.druid.indexing.common.task.RealtimeIndexTaskTest.3
            public <T> QueryRunner<T> decorate(QueryRunner<T> queryRunner, QueryToolChest<T, ? extends Query<T>> queryToolChest) {
                return queryRunner;
            }
        }), new TimeseriesQueryEngine(), new QueryWatcher() { // from class: org.apache.druid.indexing.common.task.RealtimeIndexTaskTest.4
            public void registerQuery(Query query, ListenableFuture listenableFuture) {
            }
        })));
        this.handOffCallbacks = new ConcurrentHashMap();
        SegmentHandoffNotifierFactory segmentHandoffNotifierFactory = new SegmentHandoffNotifierFactory() { // from class: org.apache.druid.indexing.common.task.RealtimeIndexTaskTest.5
            public SegmentHandoffNotifier createSegmentHandoffNotifier(String str) {
                return new SegmentHandoffNotifier() { // from class: org.apache.druid.indexing.common.task.RealtimeIndexTaskTest.5.1
                    public boolean registerSegmentHandoffCallback(SegmentDescriptor segmentDescriptor, Executor executor, Runnable runnable) {
                        RealtimeIndexTaskTest.this.handOffCallbacks.put(segmentDescriptor, new Pair(executor, runnable));
                        return true;
                    }

                    public void start() {
                    }

                    public void close() {
                    }
                };
            }
        };
        TestUtils testUtils = new TestUtils();
        new SegmentLoaderConfig() { // from class: org.apache.druid.indexing.common.task.RealtimeIndexTaskTest.6
            public List<StorageLocationConfig> getLocations() {
                return new ArrayList();
            }
        };
        return new TaskToolboxFactory(taskConfig, localTaskActionClientFactory, emitter, new TestDataSegmentPusher(), new TestDataSegmentKiller(), (DataSegmentMover) null, (DataSegmentArchiver) null, new TestDataSegmentAnnouncer(), (DataSegmentServerAnnouncer) EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentHandoffNotifierFactory, () -> {
            return defaultQueryRunnerFactoryConglomerate;
        }, Execs.directExecutor(), (MonitorScheduler) EasyMock.createMock(MonitorScheduler.class), new SegmentLoaderFactory((IndexIO) null, testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), testUtils.getTestIndexIO(), MapCache.create(1024L), new CacheConfig(), new CachePopulatorStats(), testUtils.getTestIndexMergerV9(), (DruidNodeAnnouncer) EasyMock.createNiceMock(DruidNodeAnnouncer.class), (DruidNode) EasyMock.createNiceMock(DruidNode.class), new LookupNodeService("tier"), new DataNodeService("tier", 1000L, ServerType.INDEXER_EXECUTOR, 0), new NoopTestTaskFileWriter()).build(task);
    }

    @Nullable
    public Long sumMetric(Task task, DimFilter dimFilter, String str) {
        TimeseriesQuery build = Druids.newTimeseriesQueryBuilder().dataSource("test_ds").filters(dimFilter).aggregators(ImmutableList.of(new LongSumAggregatorFactory(str, str))).granularity(Granularities.ALL).intervals("2000/3000").build();
        List list = task.getQueryRunner(build).run(QueryPlus.wrap(build), ImmutableMap.of()).toList();
        if (list.isEmpty()) {
            return 0L;
        }
        return ((TimeseriesResultValue) ((Result) list.get(0)).getValue()).getLongMetric(str);
    }
}
