package org.apache.druid.indexing.common.task;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.commons.io.FileUtils;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.FirehoseFactoryV2;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorTuningConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ListenableFutures;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.druid.segment.realtime.appenderator.Appenderators;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import org.apache.druid.segment.realtime.plumber.Committers;
import org.apache.druid.segment.realtime.plumber.PlumberSchool;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.utils.CircularBuffer;

/* loaded from: input_file:org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.class */
public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements ChatHandler {
    private static final String CTX_KEY_LOOKUP_TIER = "lookupTier";
    private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);

    @JsonIgnore
    private final RealtimeAppenderatorIngestionSpec spec;

    @JsonIgnore
    private final Queue<ListenableFuture<SegmentsAndMetadata>> pendingHandoffs;

    @JsonIgnore
    private volatile Appenderator appenderator;

    @JsonIgnore
    private volatile Firehose firehose;

    @JsonIgnore
    private volatile FireDepartmentMetrics metrics;

    @JsonIgnore
    private final RowIngestionMeters rowIngestionMeters;

    @JsonIgnore
    private volatile boolean gracefullyStopped;

    @JsonIgnore
    private volatile boolean finishingJob;

    @JsonIgnore
    private volatile Thread runThread;

    @JsonIgnore
    private CircularBuffer<Throwable> savedParseExceptions;

    @JsonIgnore
    private final Optional<ChatHandlerProvider> chatHandlerProvider;

    @JsonIgnore
    private final AuthorizerMapper authorizerMapper;

    @JsonIgnore
    private IngestionState ingestionState;

    @JsonIgnore
    private String errorMsg;

    private static String makeTaskId(RealtimeAppenderatorIngestionSpec realtimeAppenderatorIngestionSpec) {
        return StringUtils.format("index_realtime_%s_%d_%s_%s", new Object[]{realtimeAppenderatorIngestionSpec.getDataSchema().getDataSource(), Integer.valueOf(((RealtimeAppenderatorTuningConfig) realtimeAppenderatorIngestionSpec.getTuningConfig()).getShardSpec().getPartitionNum()), DateTimes.nowUtc(), RealtimeIndexTask.makeRandomId()});
    }

    @JsonCreator
    public AppenderatorDriverRealtimeIndexTask(@JsonProperty("id") String str, @JsonProperty("resource") TaskResource taskResource, @JsonProperty("spec") RealtimeAppenderatorIngestionSpec realtimeAppenderatorIngestionSpec, @JsonProperty("context") Map<String, Object> map, @JacksonInject ChatHandlerProvider chatHandlerProvider, @JacksonInject AuthorizerMapper authorizerMapper, @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory) {
        super(str == null ? makeTaskId(realtimeAppenderatorIngestionSpec) : str, StringUtils.format("index_realtime_appenderator_%s", new Object[]{realtimeAppenderatorIngestionSpec.getDataSchema().getDataSource()}), taskResource, realtimeAppenderatorIngestionSpec.getDataSchema().getDataSource(), map);
        this.appenderator = null;
        this.firehose = null;
        this.metrics = null;
        this.gracefullyStopped = false;
        this.finishingJob = false;
        this.runThread = null;
        this.spec = realtimeAppenderatorIngestionSpec;
        this.pendingHandoffs = new ConcurrentLinkedQueue();
        this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
        this.authorizerMapper = authorizerMapper;
        if (((RealtimeAppenderatorTuningConfig) realtimeAppenderatorIngestionSpec.getTuningConfig()).getMaxSavedParseExceptions() > 0) {
            this.savedParseExceptions = new CircularBuffer<>(((RealtimeAppenderatorTuningConfig) realtimeAppenderatorIngestionSpec.getTuningConfig()).getMaxSavedParseExceptions());
        }
        this.ingestionState = IngestionState.NOT_STARTED;
        this.rowIngestionMeters = rowIngestionMetersFactory.createRowIngestionMeters();
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public int getPriority() {
        return ((Integer) getContextValue(Tasks.PRIORITY_KEY, 75)).intValue();
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public String getType() {
        return "index_realtime_appenderator";
    }

    @Override // org.apache.druid.indexing.common.task.AbstractTask, org.apache.druid.indexing.common.task.Task
    public String getNodeType() {
        return "realtime";
    }

    @Override // org.apache.druid.indexing.common.task.AbstractTask, org.apache.druid.indexing.common.task.Task
    public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
        return this.appenderator == null ? new NoopQueryRunner() : (queryPlus, map) -> {
            return queryPlus.run(this.appenderator, map);
        };
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public boolean isReady(TaskActionClient taskActionClient) {
        return true;
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public TaskStatus run(TaskToolbox taskToolbox) throws Exception {
        this.runThread = Thread.currentThread();
        setupTimeoutAlert();
        DataSchema dataSchema = this.spec.getDataSchema();
        RealtimeAppenderatorTuningConfig withBasePersistDirectory = ((RealtimeAppenderatorTuningConfig) this.spec.getTuningConfig()).withBasePersistDirectory(taskToolbox.getPersistDir());
        FireDepartment fireDepartment = new FireDepartment(dataSchema, new RealtimeIOConfig((FirehoseFactory) null, (PlumberSchool) null, (FirehoseFactoryV2) null), (RealtimeTuningConfig) null);
        TaskRealtimeMetricsMonitor build = TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartment, this.rowIngestionMeters);
        this.metrics = fireDepartment.getMetrics();
        Supplier<Committer> supplier = null;
        File firehoseTemporaryDir = taskToolbox.getFirehoseTemporaryDir();
        DiscoveryDruidNode createDiscoveryDruidNode = createDiscoveryDruidNode(taskToolbox);
        this.appenderator = newAppenderator(dataSchema, withBasePersistDirectory, this.metrics, taskToolbox);
        StreamAppenderatorDriver newDriver = newDriver(dataSchema, this.appenderator, taskToolbox, this.metrics);
        try {
            try {
                if (this.chatHandlerProvider.isPresent()) {
                    log.info("Found chat handler of class[%s]", new Object[]{((ChatHandlerProvider) this.chatHandlerProvider.get()).getClass().getName()});
                    ((ChatHandlerProvider) this.chatHandlerProvider.get()).register(getId(), this, false);
                } else {
                    log.warn("No chat handler detected", new Object[0]);
                }
                taskToolbox.getDataSegmentServerAnnouncer().announce();
                taskToolbox.getDruidNodeAnnouncer().announce(createDiscoveryDruidNode);
                newDriver.startJob();
                taskToolbox.getMonitorScheduler().addMonitor(build);
                FileUtils.forceMkdir(firehoseTemporaryDir);
                FirehoseFactory firehoseFactory = this.spec.getIOConfig().getFirehoseFactory();
                boolean isFirehoseDrainableByClosing = isFirehoseDrainableByClosing(firehoseFactory);
                int i = 0;
                String makeSequenceName = makeSequenceName(getId(), 0);
                TransactionalSegmentPublisher transactionalSegmentPublisher = (set, obj) -> {
                    return (SegmentPublishResult) taskToolbox.getTaskActionClient().submit(new SegmentTransactionalInsertAction(set));
                };
                synchronized (this) {
                    if (!this.gracefullyStopped) {
                        this.firehose = firehoseFactory.connect(this.spec.getDataSchema().getParser(), firehoseTemporaryDir);
                        supplier = Committers.supplierFromFirehose(this.firehose);
                    }
                }
                this.ingestionState = IngestionState.BUILD_SEGMENTS;
                while (!this.gracefullyStopped && isFirehoseDrainableByClosing && this.firehose.hasMore()) {
                    try {
                        InputRow nextRow = this.firehose.nextRow();
                        if (nextRow != null) {
                            AppenderatorDriverAddResult add = newDriver.add(nextRow, makeSequenceName, supplier);
                            if (!add.isOk()) {
                                throw new ISE("Could not allocate segment for row with timestamp[%s]", new Object[]{nextRow.getTimestamp()});
                                break;
                            }
                            if (add.isPushRequired(withBasePersistDirectory)) {
                                publishSegments(newDriver, transactionalSegmentPublisher, supplier, makeSequenceName);
                                i++;
                                makeSequenceName = makeSequenceName(getId(), i);
                            }
                            if (add.getParseException() != null) {
                                handleParseException(add.getParseException());
                            } else {
                                this.rowIngestionMeters.incrementProcessed();
                            }
                        } else {
                            log.debug("Discarded null row, considering thrownAway.", new Object[0]);
                            this.rowIngestionMeters.incrementThrownAway();
                        }
                    } catch (ParseException e) {
                        handleParseException(e);
                    }
                }
                this.ingestionState = IngestionState.COMPLETED;
                if (!this.gracefullyStopped) {
                    synchronized (this) {
                        if (this.gracefullyStopped) {
                            log.info("Gracefully stopping.", new Object[0]);
                        } else {
                            this.finishingJob = true;
                        }
                    }
                    if (this.finishingJob) {
                        log.info("Finishing job...", new Object[0]);
                        publishSegments(newDriver, transactionalSegmentPublisher, supplier, makeSequenceName);
                        waitForSegmentPublishAndHandoff(withBasePersistDirectory.getPublishAndHandoffTimeout());
                    }
                } else if (this.firehose != null) {
                    log.info("Task was gracefully stopped, will persist data before exiting", new Object[0]);
                    persistAndWait(newDriver, (Committer) supplier.get());
                }
                if (this.chatHandlerProvider.isPresent()) {
                    ((ChatHandlerProvider) this.chatHandlerProvider.get()).unregister(getId());
                }
                CloseQuietly.close(this.firehose);
                CloseQuietly.close(this.appenderator);
                CloseQuietly.close(newDriver);
                taskToolbox.getMonitorScheduler().removeMonitor(build);
                taskToolbox.getDataSegmentServerAnnouncer().unannounce();
                taskToolbox.getDruidNodeAnnouncer().unannounce(createDiscoveryDruidNode);
                log.info("Job done!", new Object[0]);
                taskToolbox.getTaskReportFileWriter().write(getTaskCompletionReports());
                return TaskStatus.success(getId());
            } catch (Throwable th) {
                log.makeAlert(th, "Exception aborted realtime processing[%s]", new Object[]{dataSchema.getDataSource()}).emit();
                this.errorMsg = Throwables.getStackTraceAsString(th);
                taskToolbox.getTaskReportFileWriter().write(getTaskCompletionReports());
                TaskStatus failure = TaskStatus.failure(getId(), this.errorMsg);
                if (this.chatHandlerProvider.isPresent()) {
                    ((ChatHandlerProvider) this.chatHandlerProvider.get()).unregister(getId());
                }
                CloseQuietly.close(this.firehose);
                CloseQuietly.close(this.appenderator);
                CloseQuietly.close(newDriver);
                taskToolbox.getMonitorScheduler().removeMonitor(build);
                taskToolbox.getDataSegmentServerAnnouncer().unannounce();
                taskToolbox.getDruidNodeAnnouncer().unannounce(createDiscoveryDruidNode);
                return failure;
            }
        } catch (Throwable th2) {
            if (this.chatHandlerProvider.isPresent()) {
                ((ChatHandlerProvider) this.chatHandlerProvider.get()).unregister(getId());
            }
            CloseQuietly.close(this.firehose);
            CloseQuietly.close(this.appenderator);
            CloseQuietly.close(newDriver);
            taskToolbox.getMonitorScheduler().removeMonitor(build);
            taskToolbox.getDataSegmentServerAnnouncer().unannounce();
            taskToolbox.getDruidNodeAnnouncer().unannounce(createDiscoveryDruidNode);
            throw th2;
        }
    }

    @Override // org.apache.druid.indexing.common.task.AbstractTask, org.apache.druid.indexing.common.task.Task
    public boolean canRestore() {
        return true;
    }

    @Override // org.apache.druid.indexing.common.task.AbstractTask, org.apache.druid.indexing.common.task.Task
    public void stopGracefully() {
        try {
            synchronized (this) {
                if (!this.gracefullyStopped) {
                    this.gracefullyStopped = true;
                    if (this.firehose == null) {
                        log.info("stopGracefully: Firehose not started yet, so nothing to stop.", new Object[0]);
                    } else if (this.finishingJob) {
                        log.info("stopGracefully: Interrupting finishJob.", new Object[0]);
                        this.runThread.interrupt();
                    } else if (isFirehoseDrainableByClosing(this.spec.getIOConfig().getFirehoseFactory())) {
                        log.info("stopGracefully: Draining firehose.", new Object[0]);
                        this.firehose.close();
                    } else {
                        log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.", new Object[0]);
                        this.runThread.interrupt();
                    }
                }
            }
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @VisibleForTesting
    @JsonIgnore
    public Firehose getFirehose() {
        return this.firehose;
    }

    @VisibleForTesting
    @JsonIgnore
    public FireDepartmentMetrics getMetrics() {
        return this.metrics;
    }

    @VisibleForTesting
    @JsonIgnore
    public RowIngestionMeters getRowIngestionMeters() {
        return this.rowIngestionMeters;
    }

    @JsonProperty("spec")
    public RealtimeAppenderatorIngestionSpec getSpec() {
        return this.spec;
    }

    @GET
    @Produces({"application/json"})
    @Path("/rowStats")
    public Response getRowStats(@Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        HashMap newHashMap3 = Maps.newHashMap();
        newHashMap2.put(RowIngestionMeters.BUILD_SEGMENTS, this.rowIngestionMeters.getTotals());
        newHashMap3.put(RowIngestionMeters.BUILD_SEGMENTS, this.rowIngestionMeters.getMovingAverages());
        newHashMap.put("movingAverages", newHashMap3);
        newHashMap.put("totals", newHashMap2);
        return Response.ok(newHashMap).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/unparseableEvents")
    public Response getUnparseableEvents(@Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        return Response.ok(IndexTaskUtils.getMessagesFromSavedParseExceptions(this.savedParseExceptions)).build();
    }

    protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) {
        return (firehoseFactory instanceof EventReceiverFirehoseFactory) || ((firehoseFactory instanceof TimedShutoffFirehoseFactory) && isFirehoseDrainableByClosing(((TimedShutoffFirehoseFactory) firehoseFactory).getDelegateFactory())) || ((firehoseFactory instanceof ClippedFirehoseFactory) && isFirehoseDrainableByClosing(((ClippedFirehoseFactory) firehoseFactory).getDelegate()));
    }

    private Map<String, TaskReport> getTaskCompletionReports() {
        return TaskReport.buildTaskReports(new IngestionStatsAndErrorsTaskReport(getId(), new IngestionStatsAndErrorsTaskReportData(this.ingestionState, getTaskCompletionUnparseableEvents(), getTaskCompletionRowStats(), this.errorMsg)));
    }

    private Map<String, Object> getTaskCompletionUnparseableEvents() {
        HashMap newHashMap = Maps.newHashMap();
        List<String> messagesFromSavedParseExceptions = IndexTaskUtils.getMessagesFromSavedParseExceptions(this.savedParseExceptions);
        if (messagesFromSavedParseExceptions != null) {
            newHashMap.put(RowIngestionMeters.BUILD_SEGMENTS, messagesFromSavedParseExceptions);
        }
        return newHashMap;
    }

    private Map<String, Object> getTaskCompletionRowStats() {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(RowIngestionMeters.BUILD_SEGMENTS, this.rowIngestionMeters.getTotals());
        return newHashMap;
    }

    private void handleParseException(ParseException parseException) {
        if (parseException.isFromPartiallyValidRow()) {
            this.rowIngestionMeters.incrementProcessedWithError();
        } else {
            this.rowIngestionMeters.incrementUnparseable();
        }
        if (((RealtimeAppenderatorTuningConfig) this.spec.getTuningConfig()).isLogParseExceptions()) {
            log.error(parseException, "Encountered parse exception: ", new Object[0]);
        }
        if (this.savedParseExceptions != null) {
            this.savedParseExceptions.add(parseException);
        }
        if (this.rowIngestionMeters.getUnparseable() + this.rowIngestionMeters.getProcessedWithError() > ((RealtimeAppenderatorTuningConfig) this.spec.getTuningConfig()).getMaxParseExceptions()) {
            log.error("Max parse exceptions exceeded, terminating task...", new Object[0]);
            throw new RuntimeException("Max parse exceptions exceeded, terminating task...");
        }
    }

    private void setupTimeoutAlert() {
        if (((RealtimeAppenderatorTuningConfig) this.spec.getTuningConfig()).getAlertTimeout() > 0) {
            new Timer("RealtimeIndexTask-Timer", true).schedule(new TimerTask() { // from class: org.apache.druid.indexing.common.task.AppenderatorDriverRealtimeIndexTask.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    AppenderatorDriverRealtimeIndexTask.log.makeAlert("RealtimeIndexTask for dataSource [%s] hasn't finished in configured time [%d] ms.", new Object[]{AppenderatorDriverRealtimeIndexTask.this.spec.getDataSchema().getDataSource(), Long.valueOf(((RealtimeAppenderatorTuningConfig) AppenderatorDriverRealtimeIndexTask.this.spec.getTuningConfig()).getAlertTimeout())}).emit();
                }
            }, ((RealtimeAppenderatorTuningConfig) this.spec.getTuningConfig()).getAlertTimeout());
        }
    }

    private void publishSegments(StreamAppenderatorDriver streamAppenderatorDriver, TransactionalSegmentPublisher transactionalSegmentPublisher, Supplier<Committer> supplier, String str) {
        ListenableFuture publish = streamAppenderatorDriver.publish(transactionalSegmentPublisher, (Committer) supplier.get(), Collections.singletonList(str));
        Queue<ListenableFuture<SegmentsAndMetadata>> queue = this.pendingHandoffs;
        streamAppenderatorDriver.getClass();
        queue.add(ListenableFutures.transformAsync(publish, streamAppenderatorDriver::registerHandoff));
    }

    private void waitForSegmentPublishAndHandoff(long j) throws InterruptedException, ExecutionException, TimeoutException {
        if (this.pendingHandoffs.isEmpty()) {
            return;
        }
        ListenableFuture allAsList = Futures.allAsList(this.pendingHandoffs);
        log.info("Waiting for handoffs", new Object[0]);
        if (j > 0) {
            allAsList.get(j, TimeUnit.MILLISECONDS);
        } else {
            allAsList.get();
        }
    }

    private void persistAndWait(StreamAppenderatorDriver streamAppenderatorDriver, final Committer committer) {
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            streamAppenderatorDriver.persist(new Committer() { // from class: org.apache.druid.indexing.common.task.AppenderatorDriverRealtimeIndexTask.2
                public Object getMetadata() {
                    return committer.getMetadata();
                }

                public void run() {
                    try {
                        committer.run();
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
            countDownLatch.await();
        } catch (InterruptedException e) {
            log.debug(e, "Interrupted while finishing the job", new Object[0]);
        } catch (Exception e2) {
            log.makeAlert(e2, "Failed to finish realtime task", new Object[0]).emit();
            throw e2;
        }
    }

    private DiscoveryDruidNode createDiscoveryDruidNode(TaskToolbox taskToolbox) {
        LookupNodeService lookupNodeService = getContextValue("lookupTier") == null ? taskToolbox.getLookupNodeService() : new LookupNodeService((String) getContextValue("lookupTier"));
        return new DiscoveryDruidNode(taskToolbox.getDruidNode(), "peon", ImmutableMap.of(taskToolbox.getDataNodeService().getName(), taskToolbox.getDataNodeService(), lookupNodeService.getName(), lookupNodeService));
    }

    private static Appenderator newAppenderator(DataSchema dataSchema, RealtimeAppenderatorTuningConfig realtimeAppenderatorTuningConfig, FireDepartmentMetrics fireDepartmentMetrics, TaskToolbox taskToolbox) {
        return Appenderators.createRealtime(dataSchema, realtimeAppenderatorTuningConfig.withBasePersistDirectory(taskToolbox.getPersistDir()), fireDepartmentMetrics, taskToolbox.getSegmentPusher(), taskToolbox.getObjectMapper(), taskToolbox.getIndexIO(), taskToolbox.getIndexMergerV9(), taskToolbox.getQueryRunnerFactoryConglomerate(), taskToolbox.getSegmentAnnouncer(), taskToolbox.getEmitter(), taskToolbox.getQueryExecutorService(), taskToolbox.getCache(), taskToolbox.getCacheConfig(), taskToolbox.getCachePopulatorStats());
    }

    private static StreamAppenderatorDriver newDriver(DataSchema dataSchema, Appenderator appenderator, TaskToolbox taskToolbox, FireDepartmentMetrics fireDepartmentMetrics) {
        return new StreamAppenderatorDriver(appenderator, new ActionBasedSegmentAllocator(taskToolbox.getTaskActionClient(), dataSchema, (dataSchema2, inputRow, str, str2, z) -> {
            return new SegmentAllocateAction(dataSchema2.getDataSource(), inputRow.getTimestamp(), dataSchema2.getGranularitySpec().getQueryGranularity(), dataSchema2.getGranularitySpec().getSegmentGranularity(), str, str2, z);
        }), taskToolbox.getSegmentHandoffNotifierFactory(), new ActionBasedUsedSegmentChecker(taskToolbox.getTaskActionClient()), taskToolbox.getDataSegmentKiller(), taskToolbox.getObjectMapper(), fireDepartmentMetrics);
    }

    private static String makeSequenceName(String str, int i) {
        return str + "_" + i;
    }
}
