package org.apache.druid.indexing.common.task;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorTuningConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.test.TestDataSegmentAnnouncer;
import org.apache.druid.indexing.test.TestDataSegmentKiller;
import org.apache.druid.indexing.test.TestDataSegmentPusher;
import org.apache.druid.indexing.worker.IntermediaryDataManager;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.plumber.PlumberSchool;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.class */
public class AppenderatorDriverRealtimeIndexTaskTest {
    private static final Logger log = new Logger(AppenderatorDriverRealtimeIndexTaskTest.class);
    private static final ServiceEmitter EMITTER = new ServiceEmitter("service", "host", new NoopEmitter());
    private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
    private static final String FAIL_DIM = "__fail__";

    @Rule
    public final ExpectedException expectedException = ExpectedException.none();

    @Rule
    public final TemporaryFolder tempFolder = new TemporaryFolder();

    @Rule
    public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
    private DateTime now;
    private ListeningExecutorService taskExec;
    private Map<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks;
    private Collection<DataSegment> publishedSegments;
    private CountDownLatch segmentLatch;
    private CountDownLatch handoffLatch;
    private TaskStorage taskStorage;
    private TaskLockbox taskLockbox;
    private TaskToolboxFactory taskToolboxFactory;
    private File baseDir;
    private File reportsFile;
    private RowIngestionMetersFactory rowIngestionMetersFactory;
    private AppenderatorsManager appenderatorsManager;

    /* loaded from: input_file:org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest$TestFirehose.class */
    private static class TestFirehose implements Firehose {
        private final InputRowParser<Map<String, Object>> parser;
        private final Deque<Optional<Map<String, Object>>> queue = new ArrayDeque();
        private boolean closed = false;

        public TestFirehose(InputRowParser<Map<String, Object>> inputRowParser) {
            this.parser = inputRowParser;
        }

        public void addRows(List<Map<String, Object>> list) {
            synchronized (this) {
                Stream<R> map = list.stream().map((v0) -> {
                    return Optional.ofNullable(v0);
                });
                Deque<Optional<Map<String, Object>>> deque = this.queue;
                deque.getClass();
                map.forEach((v1) -> {
                    r1.add(v1);
                });
                notifyAll();
            }
        }

        public boolean hasMore() {
            boolean z;
            try {
                synchronized (this) {
                    while (this.queue.isEmpty() && !this.closed) {
                        wait();
                    }
                    z = !this.queue.isEmpty();
                }
                return z;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        public InputRow nextRow() {
            InputRow inputRow;
            synchronized (this) {
                inputRow = (InputRow) this.parser.parseBatch(this.queue.removeFirst().orElse(null)).get(0);
                if (inputRow != null && inputRow.getRaw("__fail__") != null) {
                    throw new ParseException("__fail__", new Object[0]);
                }
            }
            return inputRow;
        }

        public void close() {
            synchronized (this) {
                this.closed = true;
                notifyAll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest$TestFirehoseFactory.class */
    public static class TestFirehoseFactory implements FirehoseFactory<InputRowParser> {
        public Firehose connect(InputRowParser inputRowParser, File file) throws ParseException {
            return new TestFirehose(inputRowParser);
        }
    }

    @Before
    public void setUp() throws IOException {
        EmittingLogger.registerEmitter(EMITTER);
        EMITTER.start();
        this.taskExec = MoreExecutors.listeningDecorator(Execs.singleThreaded("realtime-index-task-test-%d"));
        this.now = DateTimes.nowUtc();
        TestDerbyConnector connector = this.derbyConnectorRule.getConnector();
        connector.createDataSourceTable();
        connector.createTaskTables();
        connector.createSegmentTable();
        connector.createPendingSegmentsTable();
        this.appenderatorsManager = new TestAppenderatorsManager();
        this.baseDir = this.tempFolder.newFolder();
        this.reportsFile = File.createTempFile("KafkaIndexTaskTestReports-" + System.currentTimeMillis(), "json");
        makeToolboxFactory(this.baseDir);
    }

    @After
    public void tearDown() {
        this.taskExec.shutdownNow();
        this.reportsFile.delete();
    }

    @Test(timeout = 60000)
    public void testDefaultResource() {
        AppenderatorDriverRealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null);
        Assert.assertEquals(makeRealtimeTask.getId(), makeRealtimeTask.getTaskResource().getAvailabilityGroup());
    }

    @Test(timeout = 60000)
    public void testHandoffTimeout() throws Exception {
        expectPublishedSegments(1);
        AppenderatorDriverRealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null, TransformSpec.NONE, true, 100L, true, 0, 1);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask);
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask.getFirehose();
        testFirehose.addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo", "met1", "1")));
        testFirehose.close();
        Assert.assertTrue(((TaskStatus) runTask.get()).getErrorMsg().contains("java.util.concurrent.TimeoutException: Timeout waiting for task."));
    }

    @Test(timeout = 60000)
    public void testBasics() throws Exception {
        expectPublishedSegments(1);
        AppenderatorDriverRealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null);
        Assert.assertTrue(makeRealtimeTask.supportsQueries());
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask);
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask.getFirehose();
        testFirehose.addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo", "met1", "1"), ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim2", "bar", "met1", Double.valueOf(2.0d))));
        testFirehose.close();
        Collection<DataSegment> awaitSegments = awaitSegments();
        Assert.assertEquals(2L, makeRealtimeTask.getRowIngestionMeters().getProcessed());
        Assert.assertEquals(0L, makeRealtimeTask.getRowIngestionMeters().getThrownAway());
        Assert.assertEquals(0L, makeRealtimeTask.getRowIngestionMeters().getUnparseable());
        Assert.assertEquals(2L, sumMetric(makeRealtimeTask, null, "rows").longValue());
        Assert.assertEquals(3L, sumMetric(makeRealtimeTask, null, "met1").longValue());
        awaitHandoffs();
        for (DataSegment dataSegment : awaitSegments) {
            Pair<Executor, Runnable> pair = this.handOffCallbacks.get(new SegmentDescriptor(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().getPartitionNum()));
            Assert.assertNotNull(dataSegment + " missing from handoff callbacks: " + this.handOffCallbacks, pair);
            ((Executor) pair.lhs).execute((Runnable) pair.rhs);
        }
        this.handOffCallbacks.clear();
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask.get()).getStatusCode());
    }

    @Test(timeout = 60000)
    public void testLateData() throws Exception {
        expectPublishedSegments(1);
        AppenderatorDriverRealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask);
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask.getFirehose();
        testFirehose.addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo", "met1", "1"), ImmutableMap.of("t", Long.valueOf(this.now.minus(new Period("P2D")).getMillis()), "dim2", "bar", "met1", Double.valueOf(2.0d))));
        testFirehose.close();
        Collection<DataSegment> awaitSegments = awaitSegments();
        Assert.assertEquals(2L, makeRealtimeTask.getRowIngestionMeters().getProcessed());
        Assert.assertEquals(0L, makeRealtimeTask.getRowIngestionMeters().getThrownAway());
        Assert.assertEquals(0L, makeRealtimeTask.getRowIngestionMeters().getUnparseable());
        Assert.assertEquals(2L, sumMetric(makeRealtimeTask, null, "rows").longValue());
        Assert.assertEquals(3L, sumMetric(makeRealtimeTask, null, "met1").longValue());
        awaitHandoffs();
        for (DataSegment dataSegment : awaitSegments) {
            Pair<Executor, Runnable> pair = this.handOffCallbacks.get(new SegmentDescriptor(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().getPartitionNum()));
            Assert.assertNotNull(dataSegment + " missing from handoff callbacks: " + this.handOffCallbacks, pair);
            ((Executor) pair.lhs).execute((Runnable) pair.rhs);
        }
        this.handOffCallbacks.clear();
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask.get()).getStatusCode());
    }

    @Test(timeout = 60000)
    public void testMaxRowsPerSegment() throws Exception {
        expectPublishedSegments(2);
        AppenderatorDriverRealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask);
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask.getFirehose();
        for (int i = 0; i < 2000; i++) {
            testFirehose.addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo-" + i, "met1", "1")));
        }
        testFirehose.close();
        Collection<DataSegment> awaitSegments = awaitSegments();
        Assert.assertEquals(2000L, makeRealtimeTask.getRowIngestionMeters().getProcessed());
        Assert.assertEquals(0L, makeRealtimeTask.getRowIngestionMeters().getThrownAway());
        Assert.assertEquals(0L, makeRealtimeTask.getRowIngestionMeters().getUnparseable());
        Assert.assertEquals(2000L, sumMetric(makeRealtimeTask, null, "rows").longValue());
        Assert.assertEquals(2000L, sumMetric(makeRealtimeTask, null, "met1").longValue());
        awaitHandoffs();
        for (DataSegment dataSegment : awaitSegments) {
            Pair<Executor, Runnable> pair = this.handOffCallbacks.get(new SegmentDescriptor(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().getPartitionNum()));
            Assert.assertNotNull(dataSegment + " missing from handoff callbacks: " + this.handOffCallbacks, pair);
            ((Executor) pair.lhs).execute((Runnable) pair.rhs);
        }
        this.handOffCallbacks.clear();
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask.get()).getStatusCode());
    }

    @Test(timeout = 60000)
    public void testMaxTotalRows() throws Exception {
        expectPublishedSegments(2);
        AppenderatorDriverRealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null, Integer.MAX_VALUE, 1500L);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask);
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask.getFirehose();
        for (int i = 0; i < 2000; i++) {
            testFirehose.addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo-" + i, "met1", "1")));
        }
        testFirehose.close();
        Collection<DataSegment> awaitSegments = awaitSegments();
        Assert.assertEquals(2000L, makeRealtimeTask.getRowIngestionMeters().getProcessed());
        Assert.assertEquals(0L, makeRealtimeTask.getRowIngestionMeters().getThrownAway());
        Assert.assertEquals(0L, makeRealtimeTask.getRowIngestionMeters().getUnparseable());
        Assert.assertEquals(2000L, sumMetric(makeRealtimeTask, null, "rows").longValue());
        Assert.assertEquals(2000L, sumMetric(makeRealtimeTask, null, "met1").longValue());
        awaitHandoffs();
        Assert.assertEquals(2L, awaitSegments.size());
        for (DataSegment dataSegment : awaitSegments) {
            Pair<Executor, Runnable> pair = this.handOffCallbacks.get(new SegmentDescriptor(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().getPartitionNum()));
            Assert.assertNotNull(dataSegment + " missing from handoff callbacks: " + this.handOffCallbacks, pair);
            ((Executor) pair.lhs).execute((Runnable) pair.rhs);
        }
        this.handOffCallbacks.clear();
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask.get()).getStatusCode());
    }

    @Test(timeout = 60000)
    public void testTransformSpec() throws Exception {
        expectPublishedSegments(2);
        AppenderatorDriverRealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null, new TransformSpec(new SelectorDimFilter("dim1", "foo", (ExtractionFn) null), ImmutableList.of(new ExpressionTransform("dim1t", "concat(dim1,dim1)", ExprMacroTable.nil()))), true, 0L, true, 0, 1);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask);
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask.getFirehose();
        testFirehose.addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo", "met1", "1"), ImmutableMap.of("t", Long.valueOf(this.now.minus(new Period("P1D")).getMillis()), "dim1", "foo", "met1", Double.valueOf(2.0d)), ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim2", "bar", "met1", Double.valueOf(2.0d))));
        testFirehose.close();
        Collection<DataSegment> awaitSegments = awaitSegments();
        Assert.assertEquals(2L, makeRealtimeTask.getRowIngestionMeters().getProcessed());
        Assert.assertEquals(1L, makeRealtimeTask.getRowIngestionMeters().getThrownAway());
        Assert.assertEquals(0L, makeRealtimeTask.getRowIngestionMeters().getUnparseable());
        Assert.assertEquals(2L, sumMetric(makeRealtimeTask, null, "rows").longValue());
        Assert.assertEquals(2L, sumMetric(makeRealtimeTask, new SelectorDimFilter("dim1t", "foofoo", (ExtractionFn) null), "rows").longValue());
        if (NullHandling.replaceWithDefault()) {
            Assert.assertEquals(0L, sumMetric(makeRealtimeTask, new SelectorDimFilter("dim1t", "barbar", (ExtractionFn) null), "metric1").longValue());
        } else {
            Assert.assertNull(sumMetric(makeRealtimeTask, new SelectorDimFilter("dim1t", "barbar", (ExtractionFn) null), "metric1"));
        }
        Assert.assertEquals(3L, sumMetric(makeRealtimeTask, null, "met1").longValue());
        awaitHandoffs();
        for (DataSegment dataSegment : awaitSegments) {
            Pair<Executor, Runnable> pair = this.handOffCallbacks.get(new SegmentDescriptor(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().getPartitionNum()));
            Assert.assertNotNull(dataSegment + " missing from handoff callbacks: " + this.handOffCallbacks, pair);
            ((Executor) pair.lhs).execute((Runnable) pair.rhs);
        }
        this.handOffCallbacks.clear();
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask.get()).getStatusCode());
    }

    @Test(timeout = 60000)
    public void testReportParseExceptionsOnBadMetric() throws Exception {
        expectPublishedSegments(0);
        AppenderatorDriverRealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null, true);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask);
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask.getFirehose();
        testFirehose.addRows(ImmutableList.of(ImmutableMap.of("t", 2000000L, "dim1", "foo", "met1", "1"), ImmutableMap.of("t", 3000000L, "dim1", "foo", "met1", "foo"), ImmutableMap.of("t", Long.valueOf(this.now.minus(new Period("P1D")).getMillis()), "dim1", "foo", "met1", "foo"), ImmutableMap.of("t", 4000000L, "dim2", "bar", "met1", Double.valueOf(2.0d))));
        testFirehose.close();
        Assert.assertTrue(((TaskStatus) runTask.get()).getErrorMsg().contains("java.lang.RuntimeException: Max parse exceptions exceeded, terminating task..."));
        Assert.assertEquals(ImmutableMap.of("buildSegments", Collections.singletonList("Found unparseable columns in row: [MapBasedInputRow{timestamp=1970-01-01T00:50:00.000Z, event={t=3000000, dim1=foo, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [Unable to parse value[foo] for field[met1],]")), getTaskReportData().getUnparseableEvents());
    }

    @Test(timeout = 60000)
    public void testNoReportParseExceptions() throws Exception {
        expectPublishedSegments(1);
        AppenderatorDriverRealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null, TransformSpec.NONE, false, 0L, true, null, 1);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask);
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask.getFirehose();
        testFirehose.addRows(Arrays.asList(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo", "met1", "1"), null, ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo", "met1", "foo"), ImmutableMap.of("dim1", "foo", "met1", Double.valueOf(2.0d), "__fail__", "x"), ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim2", "bar", "met1", Double.valueOf(2.0d))));
        testFirehose.close();
        DataSegment dataSegment = (DataSegment) Iterables.getOnlyElement(awaitSegments());
        Assert.assertEquals(2L, makeRealtimeTask.getRowIngestionMeters().getProcessed());
        Assert.assertEquals(1L, makeRealtimeTask.getRowIngestionMeters().getProcessedWithError());
        Assert.assertEquals(0L, makeRealtimeTask.getRowIngestionMeters().getThrownAway());
        Assert.assertEquals(2L, makeRealtimeTask.getRowIngestionMeters().getUnparseable());
        Assert.assertEquals(3L, sumMetric(makeRealtimeTask, null, "rows").longValue());
        Assert.assertEquals(3L, sumMetric(makeRealtimeTask, null, "met1").longValue());
        awaitHandoffs();
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> value = entry.getValue();
            Assert.assertEquals(new SegmentDescriptor(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().getPartitionNum()), entry.getKey());
            ((Executor) value.lhs).execute((Runnable) value.rhs);
        }
        this.handOffCallbacks.clear();
        ImmutableMap of = ImmutableMap.of("buildSegments", ImmutableMap.of("processed", 2, "processedWithError", 1, "unparseable", 2, "thrownAway", 0));
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask.get()).getStatusCode());
        Assert.assertEquals(of, getTaskReportData().getRowStats());
    }

    @Test(timeout = 60000)
    public void testMultipleParseExceptionsSuccess() throws Exception {
        expectPublishedSegments(1);
        AppenderatorDriverRealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null, TransformSpec.NONE, false, 0L, true, 10, 10);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask);
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask.getFirehose();
        testFirehose.addRows(Arrays.asList(ImmutableMap.of("t", 1521251960729L, "dim1", "foo", "met1", "1"), null, ImmutableMap.of("t", 1521251960729L, "dim1", "foo", "met1", "foo"), ImmutableMap.of("t", 1521251960729L, "dim1", "foo", "dimLong", "notnumber", "dimFloat", "notnumber", "met1", "foo"), ImmutableMap.of("dim1", "foo", "met1", Double.valueOf(2.0d), "__fail__", "x"), ImmutableMap.of("t", 1521251960729L, "dim2", "bar", "met1", Double.valueOf(2.0d))));
        testFirehose.close();
        DataSegment dataSegment = (DataSegment) Iterables.getOnlyElement(awaitSegments());
        Assert.assertEquals(2L, makeRealtimeTask.getRowIngestionMeters().getProcessed());
        Assert.assertEquals(2L, makeRealtimeTask.getRowIngestionMeters().getProcessedWithError());
        Assert.assertEquals(0L, makeRealtimeTask.getRowIngestionMeters().getThrownAway());
        Assert.assertEquals(2L, makeRealtimeTask.getRowIngestionMeters().getUnparseable());
        Assert.assertEquals(4L, sumMetric(makeRealtimeTask, null, "rows").longValue());
        Assert.assertEquals(3L, sumMetric(makeRealtimeTask, null, "met1").longValue());
        awaitHandoffs();
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> value = entry.getValue();
            Assert.assertEquals(new SegmentDescriptor(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().getPartitionNum()), entry.getKey());
            ((Executor) value.lhs).execute((Runnable) value.rhs);
        }
        this.handOffCallbacks.clear();
        ImmutableMap of = ImmutableMap.of("buildSegments", ImmutableMap.of("processed", 2, "processedWithError", 2, "unparseable", 2, "thrownAway", 0));
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask.get()).getStatusCode());
        IngestionStatsAndErrorsTaskReportData taskReportData = getTaskReportData();
        Assert.assertEquals(of, taskReportData.getRowStats());
        Assert.assertEquals(ImmutableMap.of("buildSegments", Arrays.asList("Unparseable timestamp found! Event: {dim1=foo, met1=2.0, __fail__=x}", "Found unparseable columns in row: [MapBasedInputRow{timestamp=2018-03-17T01:59:20.729Z, event={t=1521251960729, dim1=foo, dimLong=notnumber, dimFloat=notnumber, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to long,could not convert value [notnumber] to float,Unable to parse value[foo] for field[met1],]", "Found unparseable columns in row: [MapBasedInputRow{timestamp=2018-03-17T01:59:20.729Z, event={t=1521251960729, dim1=foo, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [Unable to parse value[foo] for field[met1],]", "Unparseable timestamp found! Event: null")), taskReportData.getUnparseableEvents());
        Assert.assertEquals(IngestionState.COMPLETED, taskReportData.getIngestionState());
    }

    @Test(timeout = 60000)
    public void testMultipleParseExceptionsFailure() throws Exception {
        expectPublishedSegments(1);
        AppenderatorDriverRealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null, TransformSpec.NONE, false, 0L, true, 3, 10);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask);
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask.getFirehose();
        testFirehose.addRows(Arrays.asList(ImmutableMap.of("t", 1521251960729L, "dim1", "foo", "met1", "1"), null, ImmutableMap.of("t", 1521251960729L, "dim1", "foo", "met1", "foo"), ImmutableMap.of("t", 1521251960729L, "dim1", "foo", "dimLong", "notnumber", "dimFloat", "notnumber", "met1", "foo"), ImmutableMap.of("dim1", "foo", "met1", Double.valueOf(2.0d), "__fail__", "x"), ImmutableMap.of("t", 1521251960729L, "dim2", "bar", "met1", Double.valueOf(2.0d))));
        testFirehose.close();
        TaskStatus taskStatus = (TaskStatus) runTask.get();
        Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
        Assert.assertTrue(taskStatus.getErrorMsg().contains("Max parse exceptions exceeded, terminating task..."));
        IngestionStatsAndErrorsTaskReportData taskReportData = getTaskReportData();
        Assert.assertEquals(ImmutableMap.of("buildSegments", ImmutableMap.of("processed", 1, "processedWithError", 2, "unparseable", 2, "thrownAway", 0)), taskReportData.getRowStats());
        Assert.assertEquals(ImmutableMap.of("buildSegments", Arrays.asList("Unparseable timestamp found! Event: {dim1=foo, met1=2.0, __fail__=x}", "Found unparseable columns in row: [MapBasedInputRow{timestamp=2018-03-17T01:59:20.729Z, event={t=1521251960729, dim1=foo, dimLong=notnumber, dimFloat=notnumber, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to long,could not convert value [notnumber] to float,Unable to parse value[foo] for field[met1],]", "Found unparseable columns in row: [MapBasedInputRow{timestamp=2018-03-17T01:59:20.729Z, event={t=1521251960729, dim1=foo, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [Unable to parse value[foo] for field[met1],]", "Unparseable timestamp found! Event: null")), taskReportData.getUnparseableEvents());
        Assert.assertEquals(IngestionState.BUILD_SEGMENTS, taskReportData.getIngestionState());
    }

    @Test(timeout = 60000)
    public void testRestore() throws Exception {
        expectPublishedSegments(0);
        AppenderatorDriverRealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask);
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        ((TestFirehose) makeRealtimeTask.getFirehose()).addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo")));
        makeRealtimeTask.stopGracefully(this.taskToolboxFactory.build(makeRealtimeTask).getConfig());
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask.get()).getStatusCode());
        Assert.assertTrue(this.publishedSegments.isEmpty());
        expectPublishedSegments(1);
        AppenderatorDriverRealtimeIndexTask makeRealtimeTask2 = makeRealtimeTask(makeRealtimeTask.getId());
        ListenableFuture<TaskStatus> runTask2 = runTask(makeRealtimeTask2);
        while (makeRealtimeTask2.getFirehose() == null) {
            Thread.sleep(50L);
        }
        Assert.assertEquals(1L, sumMetric(makeRealtimeTask2, null, "rows").longValue());
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask2.getFirehose();
        testFirehose.addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim2", "bar")));
        testFirehose.close();
        DataSegment dataSegment = (DataSegment) Iterables.getOnlyElement(awaitSegments());
        Assert.assertEquals(2L, sumMetric(makeRealtimeTask2, null, "rows").longValue());
        awaitHandoffs();
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> value = entry.getValue();
            Assert.assertEquals(new SegmentDescriptor(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().getPartitionNum()), entry.getKey());
            ((Executor) value.lhs).execute((Runnable) value.rhs);
        }
        this.handOffCallbacks.clear();
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask2.get()).getStatusCode());
    }

    @Test(timeout = 60000)
    public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception {
        AppenderatorDriverRealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null);
        expectPublishedSegments(1);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask);
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose testFirehose = (TestFirehose) makeRealtimeTask.getFirehose();
        testFirehose.addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo")));
        testFirehose.close();
        DataSegment dataSegment = (DataSegment) Iterables.getOnlyElement(awaitSegments());
        Assert.assertEquals(1L, sumMetric(makeRealtimeTask, null, "rows").longValue());
        makeRealtimeTask.stopGracefully(this.taskToolboxFactory.build(makeRealtimeTask).getConfig());
        while (!runTask.isDone()) {
            Thread.sleep(50L);
        }
        expectPublishedSegments(1);
        AppenderatorDriverRealtimeIndexTask makeRealtimeTask2 = makeRealtimeTask(makeRealtimeTask.getId());
        ListenableFuture<TaskStatus> runTask2 = runTask(makeRealtimeTask2);
        while (makeRealtimeTask2.getFirehose() == null) {
            Thread.sleep(50L);
        }
        ((TestFirehose) makeRealtimeTask2.getFirehose()).close();
        awaitHandoffs();
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> value = entry.getValue();
            Assert.assertEquals(new SegmentDescriptor(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().getPartitionNum()), entry.getKey());
            ((Executor) value.lhs).execute((Runnable) value.rhs);
        }
        this.handOffCallbacks.clear();
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask2.get()).getStatusCode());
    }

    @Test(timeout = 60000)
    public void testRestoreCorruptData() throws Exception {
        AppenderatorDriverRealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null);
        expectPublishedSegments(0);
        ListenableFuture<TaskStatus> runTask = runTask(makeRealtimeTask);
        while (makeRealtimeTask.getFirehose() == null) {
            Thread.sleep(50L);
        }
        ((TestFirehose) makeRealtimeTask.getFirehose()).addRows(ImmutableList.of(ImmutableMap.of("t", Long.valueOf(this.now.getMillis()), "dim1", "foo")));
        makeRealtimeTask.stopGracefully(this.taskToolboxFactory.build(makeRealtimeTask).getConfig());
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask.get()).getStatusCode());
        Assert.assertTrue(this.publishedSegments.isEmpty());
        Optional findFirst = FileUtils.listFiles(this.baseDir, (String[]) null, true).stream().filter(file -> {
            return file.getName().equals("00000.smoosh");
        }).findFirst();
        Assert.assertTrue("Could not find smoosh file", findFirst.isPresent());
        Files.write(((File) findFirst.get()).toPath(), StringUtils.toUtf8("oops!"), new OpenOption[0]);
        expectPublishedSegments(0);
        TaskStatus taskStatus = (TaskStatus) runTask(makeRealtimeTask(makeRealtimeTask.getId())).get();
        Assert.assertEquals(ImmutableMap.of("buildSegments", ImmutableMap.of("processedWithError", 0, "processed", 0, "unparseable", 0, "thrownAway", 0)), getTaskReportData().getRowStats());
        Assert.assertTrue(Pattern.compile("(?s)java\\.lang\\.IllegalArgumentException.*\n\tat (java\\.base/)?java\\.nio\\.Buffer\\..*").matcher(taskStatus.getErrorMsg()).matches());
    }

    @Test(timeout = 60000)
    public void testStopBeforeStarting() throws Exception {
        expectPublishedSegments(0);
        AppenderatorDriverRealtimeIndexTask makeRealtimeTask = makeRealtimeTask(null);
        makeRealtimeTask.stopGracefully(this.taskToolboxFactory.build(makeRealtimeTask).getConfig());
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) runTask(makeRealtimeTask).get()).getStatusCode());
    }

    private ListenableFuture<TaskStatus> runTask(Task task) {
        try {
            this.taskStorage.insert(task, TaskStatus.running(task.getId()));
        } catch (EntryExistsException e) {
        }
        this.taskLockbox.syncFromStorage();
        TaskToolbox build = this.taskToolboxFactory.build(task);
        return this.taskExec.submit(() -> {
            try {
                if (task.isReady(build.getTaskActionClient())) {
                    return task.run(build);
                }
                throw new ISE("Task is not ready", new Object[0]);
            } catch (Exception e2) {
                log.warn(e2, "Task failed", new Object[0]);
                throw e2;
            }
        });
    }

    private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(String str) {
        return makeRealtimeTask(str, TransformSpec.NONE, true, 0L, true, 0, 1);
    }

    private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(String str, Integer num, Long l) {
        return makeRealtimeTask(str, TransformSpec.NONE, true, 0L, true, 0, 1, num, l);
    }

    private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(String str, boolean z) {
        return makeRealtimeTask(str, TransformSpec.NONE, z, 0L, true, null, 1);
    }

    private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(String str, TransformSpec transformSpec, boolean z, long j, Boolean bool, Integer num, Integer num2) {
        return makeRealtimeTask(str, transformSpec, z, j, bool, num, num2, 1000, null);
    }

    private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(String str, TransformSpec transformSpec, boolean z, long j, Boolean bool, Integer num, Integer num2, Integer num3, Long l) {
        return new AppenderatorDriverRealtimeIndexTask(str, null, new RealtimeAppenderatorIngestionSpec(new DataSchema("test_ds", (Map) TestHelper.makeJsonMapper().convertValue(new MapInputRowParser(new TimeAndDimsParseSpec(new TimestampSpec("t", "auto", (DateTime) null), new DimensionsSpec(ImmutableList.of(new StringDimensionSchema("dim1"), new StringDimensionSchema("dim2"), new StringDimensionSchema("dim1t"), new LongDimensionSchema("dimLong"), new FloatDimensionSchema("dimFloat")), (List) null, (List) null))), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT), new AggregatorFactory[]{new CountAggregatorFactory("rows"), new LongSumAggregatorFactory("met1", "met1")}, new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, (List) null), transformSpec, OBJECT_MAPPER), new RealtimeIOConfig(new TestFirehoseFactory(), (PlumberSchool) null), new RealtimeAppenderatorTuningConfig(1000, (Long) null, num3, l, (Period) null, (File) null, (Integer) null, (ShardSpec) null, (IndexSpec) null, (IndexSpec) null, Boolean.valueOf(z), Long.valueOf(j), (Long) null, (SegmentWriteOutMediumFactory) null, bool, num, num2)), null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, this.rowIngestionMetersFactory, this.appenderatorsManager) { // from class: org.apache.druid.indexing.common.task.AppenderatorDriverRealtimeIndexTaskTest.1
            protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) {
                return true;
            }
        };
    }

    private void expectPublishedSegments(int i) {
        this.segmentLatch = new CountDownLatch(i);
        this.handoffLatch = new CountDownLatch(i);
    }

    private Collection<DataSegment> awaitSegments() throws InterruptedException {
        Assert.assertTrue("Timed out waiting for segments to be published", this.segmentLatch.await(1L, TimeUnit.MINUTES));
        return this.publishedSegments;
    }

    private void awaitHandoffs() throws InterruptedException {
        Assert.assertTrue("Timed out waiting for segments to be handed off", this.handoffLatch.await(1L, TimeUnit.MINUTES));
    }

    private void makeToolboxFactory(File file) {
        this.taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig((Period) null));
        this.publishedSegments = new CopyOnWriteArrayList();
        DefaultObjectMapper defaultObjectMapper = new DefaultObjectMapper();
        defaultObjectMapper.registerSubtypes(new Class[]{LinearShardSpec.class});
        defaultObjectMapper.registerSubtypes(new Class[]{NumberedShardSpec.class});
        IndexerSQLMetadataStorageCoordinator indexerSQLMetadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(defaultObjectMapper, (MetadataStorageTablesConfig) this.derbyConnectorRule.metadataTablesConfigSupplier().get(), this.derbyConnectorRule.getConnector()) { // from class: org.apache.druid.indexing.common.task.AppenderatorDriverRealtimeIndexTaskTest.2
            public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> set) throws IOException {
                Set<DataSegment> announceHistoricalSegments = super.announceHistoricalSegments(set);
                Assert.assertFalse("Segment latch not initialized, did you forget to call expectPublishSegments?", AppenderatorDriverRealtimeIndexTaskTest.this.segmentLatch == null);
                AppenderatorDriverRealtimeIndexTaskTest.this.publishedSegments.addAll(announceHistoricalSegments);
                set.forEach(dataSegment -> {
                    AppenderatorDriverRealtimeIndexTaskTest.this.segmentLatch.countDown();
                });
                return announceHistoricalSegments;
            }

            public SegmentPublishResult announceHistoricalSegments(Set<DataSegment> set, DataSourceMetadata dataSourceMetadata, DataSourceMetadata dataSourceMetadata2) throws IOException {
                SegmentPublishResult announceHistoricalSegments = super.announceHistoricalSegments(set, dataSourceMetadata, dataSourceMetadata2);
                Assert.assertFalse("Segment latch not initialized, did you forget to call expectPublishSegments?", AppenderatorDriverRealtimeIndexTaskTest.this.segmentLatch == null);
                AppenderatorDriverRealtimeIndexTaskTest.this.publishedSegments.addAll(announceHistoricalSegments.getSegments());
                announceHistoricalSegments.getSegments().forEach(dataSegment -> {
                    AppenderatorDriverRealtimeIndexTaskTest.this.segmentLatch.countDown();
                });
                return announceHistoricalSegments;
            }
        };
        this.taskLockbox = new TaskLockbox(this.taskStorage, indexerSQLMetadataStorageCoordinator);
        TaskConfig taskConfig = new TaskConfig(file.getPath(), (String) null, (String) null, 50000, (List) null, true, (Period) null, (Period) null, (List) null);
        LocalTaskActionClientFactory localTaskActionClientFactory = new LocalTaskActionClientFactory(this.taskStorage, new TaskActionToolbox(this.taskLockbox, this.taskStorage, indexerSQLMetadataStorageCoordinator, EMITTER, (SupervisorManager) EasyMock.createMock(SupervisorManager.class)), new TaskAuditLogConfig(false));
        DefaultQueryRunnerFactoryConglomerate defaultQueryRunnerFactoryConglomerate = new DefaultQueryRunnerFactoryConglomerate(ImmutableMap.of(TimeseriesQuery.class, new TimeseriesQueryRunnerFactory(new TimeseriesQueryQueryToolChest(), new TimeseriesQueryEngine(), (query, listenableFuture) -> {
        })));
        this.handOffCallbacks = new ConcurrentHashMap();
        SegmentHandoffNotifierFactory segmentHandoffNotifierFactory = str -> {
            return new SegmentHandoffNotifier() { // from class: org.apache.druid.indexing.common.task.AppenderatorDriverRealtimeIndexTaskTest.3
                public boolean registerSegmentHandoffCallback(SegmentDescriptor segmentDescriptor, Executor executor, Runnable runnable) {
                    AppenderatorDriverRealtimeIndexTaskTest.this.handOffCallbacks.put(segmentDescriptor, new Pair(executor, runnable));
                    AppenderatorDriverRealtimeIndexTaskTest.this.handoffLatch.countDown();
                    return true;
                }

                public void start() {
                }

                public void close() {
                }
            };
        };
        TestUtils testUtils = new TestUtils();
        this.rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
        new SegmentLoaderConfig() { // from class: org.apache.druid.indexing.common.task.AppenderatorDriverRealtimeIndexTaskTest.4
            public List<StorageLocationConfig> getLocations() {
                return new ArrayList();
            }
        };
        this.taskToolboxFactory = new TaskToolboxFactory(taskConfig, new DruidNode("druid/middlemanager", "localhost", false, 8091, (Integer) null, true, false), localTaskActionClientFactory, EMITTER, new TestDataSegmentPusher(), new TestDataSegmentKiller(), (DataSegmentMover) null, (DataSegmentArchiver) null, new TestDataSegmentAnnouncer(), (DataSegmentServerAnnouncer) EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentHandoffNotifierFactory, () -> {
            return defaultQueryRunnerFactoryConglomerate;
        }, Execs.directExecutor(), NoopJoinableFactory.INSTANCE, () -> {
            return (MonitorScheduler) EasyMock.createMock(MonitorScheduler.class);
        }, new SegmentLoaderFactory((IndexIO) null, testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), testUtils.getTestIndexIO(), MapCache.create(1024L), new CacheConfig(), new CachePopulatorStats(), testUtils.getTestIndexMergerV9(), (DruidNodeAnnouncer) EasyMock.createNiceMock(DruidNodeAnnouncer.class), (DruidNode) EasyMock.createNiceMock(DruidNode.class), new LookupNodeService("tier"), new DataNodeService("tier", 1000L, ServerType.INDEXER_EXECUTOR, 0), new SingleFileTaskReportFileWriter(this.reportsFile), (IntermediaryDataManager) null);
    }

    @Nullable
    public Long sumMetric(Task task, DimFilter dimFilter, String str) {
        TimeseriesQuery build = Druids.newTimeseriesQueryBuilder().dataSource("test_ds").filters(dimFilter).aggregators(ImmutableList.of(new LongSumAggregatorFactory(str, str))).granularity(Granularities.ALL).intervals("2000/3000").build();
        List list = task.getQueryRunner(build).run(QueryPlus.wrap(build)).toList();
        if (list.isEmpty()) {
            return 0L;
        }
        return ((TimeseriesResultValue) ((Result) list.get(0)).getValue()).getLongMetric(str);
    }

    private IngestionStatsAndErrorsTaskReportData getTaskReportData() throws IOException {
        return IngestionStatsAndErrorsTaskReportData.getPayloadFromTaskReports((Map) OBJECT_MAPPER.readValue(this.reportsFile, new TypeReference<Map<String, TaskReport>>() { // from class: org.apache.druid.indexing.common.task.AppenderatorDriverRealtimeIndexTaskTest.5
        }));
    }
}
