package org.apache.druid.indexing.common.task;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentLockAcquireAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorTuningConfig;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.overlord.LockResult;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import org.apache.druid.segment.realtime.plumber.Committers;
import org.apache.druid.segment.realtime.plumber.PlumberSchool;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.utils.CloseableUtils;

@Deprecated
/* loaded from: input_file:org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.class */
public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements ChatHandler {
    private static final String CTX_KEY_LOOKUP_TIER = "lookupTier";
    private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);

    @JsonIgnore
    private final RealtimeAppenderatorIngestionSpec spec;

    @JsonIgnore
    private final Queue<ListenableFuture<SegmentsAndCommitMetadata>> pendingHandoffs;

    @JsonIgnore
    private volatile Appenderator appenderator;

    @JsonIgnore
    private volatile Firehose firehose;

    @JsonIgnore
    private volatile FireDepartmentMetrics metrics;

    @JsonIgnore
    private volatile boolean gracefullyStopped;

    @JsonIgnore
    private volatile boolean finishingJob;

    @JsonIgnore
    private volatile Thread runThread;

    @JsonIgnore
    private final LockGranularity lockGranularity;

    @JsonIgnore
    private ParseExceptionHandler parseExceptionHandler;

    @JsonIgnore
    private IngestionState ingestionState;

    @JsonIgnore
    private AuthorizerMapper authorizerMapper;

    @JsonIgnore
    private RowIngestionMeters rowIngestionMeters;

    @JsonIgnore
    private String errorMsg;

    private static String makeTaskId(RealtimeAppenderatorIngestionSpec realtimeAppenderatorIngestionSpec) {
        return StringUtils.format("index_realtime_%s_%d_%s_%s", new Object[]{realtimeAppenderatorIngestionSpec.getDataSchema().getDataSource(), Integer.valueOf(realtimeAppenderatorIngestionSpec.getTuningConfig().getShardSpec().getPartitionNum()), DateTimes.nowUtc(), RealtimeIndexTask.makeRandomId()});
    }

    @JsonCreator
    public AppenderatorDriverRealtimeIndexTask(@JsonProperty("id") String str, @JsonProperty("resource") TaskResource taskResource, @JsonProperty("spec") RealtimeAppenderatorIngestionSpec realtimeAppenderatorIngestionSpec, @JsonProperty("context") Map<String, Object> map) {
        super(str == null ? makeTaskId(realtimeAppenderatorIngestionSpec) : str, StringUtils.format("index_realtime_appenderator_%s", new Object[]{realtimeAppenderatorIngestionSpec.getDataSchema().getDataSource()}), taskResource, realtimeAppenderatorIngestionSpec.getDataSchema().getDataSource(), map);
        this.appenderator = null;
        this.firehose = null;
        this.metrics = null;
        this.gracefullyStopped = false;
        this.finishingJob = false;
        this.runThread = null;
        this.spec = realtimeAppenderatorIngestionSpec;
        this.pendingHandoffs = new ConcurrentLinkedQueue();
        this.ingestionState = IngestionState.NOT_STARTED;
        this.lockGranularity = ((Boolean) getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true)).booleanValue() ? LockGranularity.TIME_CHUNK : LockGranularity.SEGMENT;
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public int getPriority() {
        return ((Integer) getContextValue(Tasks.PRIORITY_KEY, 75)).intValue();
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public String getType() {
        return "index_realtime_appenderator";
    }

    @Override // org.apache.druid.indexing.common.task.Task
    @JsonIgnore
    @Nonnull
    public Set<ResourceAction> getInputSourceResources() throws UOE {
        throw new UOE(StringUtils.format("Task type [%s], does not support input source based security", new Object[]{getType()}), new Object[0]);
    }

    @Override // org.apache.druid.indexing.common.task.AbstractTask, org.apache.druid.indexing.common.task.Task
    public String getNodeType() {
        return "realtime";
    }

    @Override // org.apache.druid.indexing.common.task.AbstractTask, org.apache.druid.indexing.common.task.Task
    public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
        return this.appenderator == null ? new NoopQueryRunner() : (queryPlus, responseContext) -> {
            return queryPlus.run(this.appenderator, responseContext);
        };
    }

    @Override // org.apache.druid.indexing.common.task.AbstractTask, org.apache.druid.indexing.common.task.Task
    public boolean supportsQueries() {
        return true;
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public boolean isReady(TaskActionClient taskActionClient) {
        return true;
    }

    @Override // org.apache.druid.indexing.common.task.AbstractTask
    public TaskStatus runTask(TaskToolbox taskToolbox) {
        this.runThread = Thread.currentThread();
        this.authorizerMapper = taskToolbox.getAuthorizerMapper();
        this.rowIngestionMeters = taskToolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
        this.parseExceptionHandler = new ParseExceptionHandler(this.rowIngestionMeters, this.spec.getTuningConfig().isLogParseExceptions(), this.spec.getTuningConfig().getMaxParseExceptions(), this.spec.getTuningConfig().getMaxSavedParseExceptions());
        setupTimeoutAlert();
        DataSchema dataSchema = this.spec.getDataSchema();
        RealtimeAppenderatorTuningConfig m14withBasePersistDirectory = this.spec.getTuningConfig().m14withBasePersistDirectory(taskToolbox.getPersistDir());
        FireDepartment fireDepartment = new FireDepartment(dataSchema, new RealtimeIOConfig((FirehoseFactory) null, (PlumberSchool) null), (RealtimeTuningConfig) null);
        TaskRealtimeMetricsMonitor build = TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartment, this.rowIngestionMeters);
        this.metrics = fireDepartment.getMetrics();
        Supplier<Committer> nilSupplier = Committers.nilSupplier();
        DiscoveryDruidNode createDiscoveryDruidNode = createDiscoveryDruidNode(taskToolbox);
        this.appenderator = newAppenderator(dataSchema, m14withBasePersistDirectory, this.metrics, taskToolbox);
        StreamAppenderatorDriver newDriver = newDriver(dataSchema, this.appenderator, taskToolbox, this.metrics, TaskLocks.determineLockTypeForAppend(getContext()));
        try {
            try {
                log.debug("Found chat handler of class[%s]", new Object[]{taskToolbox.getChatHandlerProvider().getClass().getName()});
                taskToolbox.getChatHandlerProvider().register(getId(), this, false);
                if (taskToolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) {
                    taskToolbox.getDataSegmentServerAnnouncer().announce();
                    taskToolbox.getDruidNodeAnnouncer().announce(createDiscoveryDruidNode);
                }
                newDriver.startJob(segmentIdWithShardSpec -> {
                    try {
                        if (this.lockGranularity == LockGranularity.SEGMENT) {
                            return ((LockResult) taskToolbox.getTaskActionClient().submit(new SegmentLockAcquireAction(TaskLockType.EXCLUSIVE, segmentIdWithShardSpec.getInterval(), segmentIdWithShardSpec.getVersion(), segmentIdWithShardSpec.getShardSpec().getPartitionNum(), 1000L))).isOk();
                        }
                        TaskLock taskLock = (TaskLock) taskToolbox.getTaskActionClient().submit(new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, segmentIdWithShardSpec.getInterval(), 1000L));
                        if (taskLock == null) {
                            return false;
                        }
                        if (taskLock.isRevoked()) {
                            throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", new Object[]{segmentIdWithShardSpec.getInterval()}), new Object[0]);
                        }
                        return true;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });
                taskToolbox.addMonitor(build);
                FirehoseFactory firehoseFactory = this.spec.getIOConfig().getFirehoseFactory();
                boolean isFirehoseDrainableByClosing = isFirehoseDrainableByClosing(firehoseFactory);
                int i = 0;
                String makeSequenceName = makeSequenceName(getId(), 0);
                TransactionalSegmentPublisher transactionalSegmentPublisher = (set, set2, obj) -> {
                    if (set == null || set.isEmpty()) {
                        return (SegmentPublishResult) taskToolbox.getTaskActionClient().submit(SegmentTransactionalInsertAction.appendAction(set2, null, null));
                    }
                    throw new ISE("Stream ingestion task unexpectedly attempted to overwrite segments: %s", new Object[]{SegmentUtils.commaSeparatedIdentifiers(set)});
                };
                synchronized (this) {
                    if (!this.gracefullyStopped) {
                        this.firehose = firehoseFactory.connect((InputRowParser) Preconditions.checkNotNull(this.spec.getDataSchema().getParser(), "inputRowParser"), taskToolbox.getIndexingTmpDir());
                    }
                }
                this.ingestionState = IngestionState.BUILD_SEGMENTS;
                while (!this.gracefullyStopped && isFirehoseDrainableByClosing && this.firehose.hasMore()) {
                    try {
                        InputRow nextRow = this.firehose.nextRow();
                        if (nextRow != null) {
                            AppenderatorDriverAddResult add = newDriver.add(nextRow, makeSequenceName, nilSupplier);
                            if (!add.isOk()) {
                                throw new ISE("Could not allocate segment for row with timestamp[%s]", new Object[]{nextRow.getTimestamp()});
                                break;
                            }
                            if (add.isPushRequired(m14withBasePersistDirectory.m15getPartitionsSpec().getMaxRowsPerSegment(), Long.valueOf(m14withBasePersistDirectory.m15getPartitionsSpec().getMaxTotalRowsOr(20000000L)))) {
                                publishSegments(newDriver, transactionalSegmentPublisher, nilSupplier, makeSequenceName);
                                i++;
                                makeSequenceName = makeSequenceName(getId(), i);
                            }
                        } else {
                            log.debug("Discarded null row, considering thrownAway.", new Object[0]);
                            this.rowIngestionMeters.incrementThrownAway();
                        }
                    } catch (ParseException e) {
                        handleParseException(e);
                    }
                }
                this.ingestionState = IngestionState.COMPLETED;
                if (!this.gracefullyStopped) {
                    synchronized (this) {
                        if (this.gracefullyStopped) {
                            log.info("Gracefully stopping.", new Object[0]);
                        } else {
                            this.finishingJob = true;
                        }
                    }
                    if (this.finishingJob) {
                        log.info("Finishing job...", new Object[0]);
                        publishSegments(newDriver, transactionalSegmentPublisher, nilSupplier, makeSequenceName);
                        waitForSegmentPublishAndHandoff(m14withBasePersistDirectory.getPublishAndHandoffTimeout());
                    }
                } else if (this.firehose != null) {
                    log.info("Task was gracefully stopped, will persist data before exiting", new Object[0]);
                    persistAndWait(newDriver, (Committer) nilSupplier.get());
                }
                taskToolbox.getChatHandlerProvider().unregister(getId());
                CloseableUtils.closeAndSuppressExceptions(this.firehose, th -> {
                    log.warn("Failed to close Firehose", new Object[0]);
                });
                this.appenderator.close();
                CloseableUtils.closeAndSuppressExceptions(newDriver, th2 -> {
                    log.warn("Failed to close AppenderatorDriver", new Object[0]);
                });
                taskToolbox.removeMonitor(build);
                if (taskToolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) {
                    taskToolbox.getDataSegmentServerAnnouncer().unannounce();
                    taskToolbox.getDruidNodeAnnouncer().unannounce(createDiscoveryDruidNode);
                }
                log.info("Job done!", new Object[0]);
                taskToolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
                return TaskStatus.success(getId());
            } catch (Throwable th3) {
                log.makeAlert(th3, "Exception aborted realtime processing[%s]", new Object[]{dataSchema.getDataSource()}).emit();
                this.errorMsg = Throwables.getStackTraceAsString(th3);
                taskToolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
                TaskStatus failure = TaskStatus.failure(getId(), this.errorMsg);
                taskToolbox.getChatHandlerProvider().unregister(getId());
                CloseableUtils.closeAndSuppressExceptions(this.firehose, th4 -> {
                    log.warn("Failed to close Firehose", new Object[0]);
                });
                this.appenderator.close();
                CloseableUtils.closeAndSuppressExceptions(newDriver, th22 -> {
                    log.warn("Failed to close AppenderatorDriver", new Object[0]);
                });
                taskToolbox.removeMonitor(build);
                if (taskToolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) {
                    taskToolbox.getDataSegmentServerAnnouncer().unannounce();
                    taskToolbox.getDruidNodeAnnouncer().unannounce(createDiscoveryDruidNode);
                }
                return failure;
            }
        } catch (Throwable th5) {
            taskToolbox.getChatHandlerProvider().unregister(getId());
            CloseableUtils.closeAndSuppressExceptions(this.firehose, th42 -> {
                log.warn("Failed to close Firehose", new Object[0]);
            });
            this.appenderator.close();
            CloseableUtils.closeAndSuppressExceptions(newDriver, th222 -> {
                log.warn("Failed to close AppenderatorDriver", new Object[0]);
            });
            taskToolbox.removeMonitor(build);
            if (taskToolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) {
                taskToolbox.getDataSegmentServerAnnouncer().unannounce();
                taskToolbox.getDruidNodeAnnouncer().unannounce(createDiscoveryDruidNode);
            }
            throw th5;
        }
    }

    @Override // org.apache.druid.indexing.common.task.AbstractTask, org.apache.druid.indexing.common.task.Task
    public boolean canRestore() {
        return true;
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public void stopGracefully(TaskConfig taskConfig) {
        if (!taskConfig.isRestoreTasksOnRestart()) {
            synchronized (this) {
                if (!this.gracefullyStopped) {
                    this.gracefullyStopped = true;
                    this.runThread.interrupt();
                }
            }
            return;
        }
        try {
            synchronized (this) {
                if (!this.gracefullyStopped) {
                    this.gracefullyStopped = true;
                    if (this.firehose == null) {
                        log.info("stopGracefully: Firehose not started yet, so nothing to stop.", new Object[0]);
                    } else if (this.finishingJob) {
                        log.info("stopGracefully: Interrupting finishJob.", new Object[0]);
                        this.runThread.interrupt();
                    } else if (isFirehoseDrainableByClosing(this.spec.getIOConfig().getFirehoseFactory())) {
                        log.info("stopGracefully: Draining firehose.", new Object[0]);
                        this.firehose.close();
                    } else {
                        log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.", new Object[0]);
                        this.runThread.interrupt();
                    }
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    @JsonIgnore
    public Firehose getFirehose() {
        return this.firehose;
    }

    @VisibleForTesting
    @JsonIgnore
    public FireDepartmentMetrics getMetrics() {
        return this.metrics;
    }

    @VisibleForTesting
    @JsonIgnore
    public RowIngestionMeters getRowIngestionMeters() {
        return this.rowIngestionMeters;
    }

    @JsonProperty("spec")
    public RealtimeAppenderatorIngestionSpec getSpec() {
        return this.spec;
    }

    @GET
    @Produces({"application/json"})
    @Path("/rowStats")
    public Response getRowStats(@Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        hashMap2.put("buildSegments", this.rowIngestionMeters.getTotals());
        hashMap3.put("buildSegments", this.rowIngestionMeters.getMovingAverages());
        hashMap.put("movingAverages", hashMap3);
        hashMap.put("totals", hashMap2);
        return Response.ok(hashMap).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/unparseableEvents")
    public Response getUnparseableEvents(@Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        return Response.ok(IndexTaskUtils.getReportListFromSavedParseExceptions(this.parseExceptionHandler.getSavedParseExceptionReports())).build();
    }

    protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) {
        return (firehoseFactory instanceof EventReceiverFirehoseFactory) || ((firehoseFactory instanceof TimedShutoffFirehoseFactory) && isFirehoseDrainableByClosing(((TimedShutoffFirehoseFactory) firehoseFactory).getDelegateFactory())) || ((firehoseFactory instanceof ClippedFirehoseFactory) && isFirehoseDrainableByClosing(((ClippedFirehoseFactory) firehoseFactory).getDelegate()));
    }

    private Map<String, TaskReport> getTaskCompletionReports() {
        TaskReport[] taskReportArr = new TaskReport[1];
        taskReportArr[0] = new IngestionStatsAndErrorsTaskReport(getId(), new IngestionStatsAndErrorsTaskReportData(this.ingestionState, getTaskCompletionUnparseableEvents(), getTaskCompletionRowStats(), this.errorMsg, this.errorMsg == null, 0L));
        return TaskReport.buildTaskReports(taskReportArr);
    }

    private Map<String, Object> getTaskCompletionUnparseableEvents() {
        HashMap hashMap = new HashMap();
        List<ParseExceptionReport> reportListFromSavedParseExceptions = IndexTaskUtils.getReportListFromSavedParseExceptions(this.parseExceptionHandler.getSavedParseExceptionReports());
        if (reportListFromSavedParseExceptions != null) {
            hashMap.put("buildSegments", reportListFromSavedParseExceptions);
        }
        return hashMap;
    }

    private Map<String, Object> getTaskCompletionRowStats() {
        HashMap hashMap = new HashMap();
        hashMap.put("buildSegments", this.rowIngestionMeters.getTotals());
        return hashMap;
    }

    private void handleParseException(ParseException parseException) {
        this.parseExceptionHandler.handle(parseException);
        if (this.rowIngestionMeters.getUnparseable() + this.rowIngestionMeters.getProcessedWithError() > this.spec.getTuningConfig().getMaxParseExceptions()) {
            log.error("Max parse exceptions exceeded, terminating task...", new Object[0]);
            throw new RuntimeException("Max parse exceptions exceeded, terminating task...");
        }
    }

    private void setupTimeoutAlert() {
        if (this.spec.getTuningConfig().getAlertTimeout() > 0) {
            new Timer("RealtimeIndexTask-Timer", true).schedule(new TimerTask() { // from class: org.apache.druid.indexing.common.task.AppenderatorDriverRealtimeIndexTask.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    AppenderatorDriverRealtimeIndexTask.log.makeAlert("RealtimeIndexTask for dataSource [%s] hasn't finished in configured time [%d] ms.", new Object[]{AppenderatorDriverRealtimeIndexTask.this.spec.getDataSchema().getDataSource(), Long.valueOf(AppenderatorDriverRealtimeIndexTask.this.spec.getTuningConfig().getAlertTimeout())}).emit();
                }
            }, this.spec.getTuningConfig().getAlertTimeout());
        }
    }

    private void publishSegments(StreamAppenderatorDriver streamAppenderatorDriver, TransactionalSegmentPublisher transactionalSegmentPublisher, Supplier<Committer> supplier, String str) {
        ListenableFuture publish = streamAppenderatorDriver.publish(transactionalSegmentPublisher, (Committer) supplier.get(), Collections.singletonList(str));
        Queue<ListenableFuture<SegmentsAndCommitMetadata>> queue = this.pendingHandoffs;
        Objects.requireNonNull(streamAppenderatorDriver);
        queue.add(Futures.transformAsync(publish, streamAppenderatorDriver::registerHandoff, MoreExecutors.directExecutor()));
    }

    private void waitForSegmentPublishAndHandoff(long j) throws InterruptedException, ExecutionException, TimeoutException {
        if (this.pendingHandoffs.isEmpty()) {
            return;
        }
        ListenableFuture allAsList = Futures.allAsList(this.pendingHandoffs);
        log.info("Waiting for handoffs", new Object[0]);
        if (j > 0) {
            allAsList.get(j, TimeUnit.MILLISECONDS);
        } else {
            allAsList.get();
        }
    }

    private void persistAndWait(StreamAppenderatorDriver streamAppenderatorDriver, final Committer committer) {
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            streamAppenderatorDriver.persist(new Committer() { // from class: org.apache.druid.indexing.common.task.AppenderatorDriverRealtimeIndexTask.2
                public Object getMetadata() {
                    return committer.getMetadata();
                }

                public void run() {
                    try {
                        committer.run();
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
            countDownLatch.await();
        } catch (InterruptedException e) {
            log.debug(e, "Interrupted while finishing the job", new Object[0]);
        } catch (Exception e2) {
            log.makeAlert(e2, "Failed to finish realtime task", new Object[0]).emit();
            throw e2;
        }
    }

    private DiscoveryDruidNode createDiscoveryDruidNode(TaskToolbox taskToolbox) {
        LookupNodeService lookupNodeService = getContextValue("lookupTier") == null ? taskToolbox.getLookupNodeService() : new LookupNodeService((String) getContextValue("lookupTier"));
        return new DiscoveryDruidNode(taskToolbox.getDruidNode(), NodeRole.PEON, ImmutableMap.of(taskToolbox.getDataNodeService().getName(), taskToolbox.getDataNodeService(), lookupNodeService.getName(), lookupNodeService));
    }

    private Appenderator newAppenderator(DataSchema dataSchema, RealtimeAppenderatorTuningConfig realtimeAppenderatorTuningConfig, FireDepartmentMetrics fireDepartmentMetrics, TaskToolbox taskToolbox) {
        return taskToolbox.getAppenderatorsManager().createRealtimeAppenderatorForTask((SegmentLoaderConfig) null, getId(), dataSchema, realtimeAppenderatorTuningConfig.m14withBasePersistDirectory(taskToolbox.getPersistDir()), fireDepartmentMetrics, taskToolbox.getSegmentPusher(), taskToolbox.getJsonMapper(), taskToolbox.getIndexIO(), taskToolbox.getIndexMergerV9(), taskToolbox.getQueryRunnerFactoryConglomerate(), taskToolbox.getSegmentAnnouncer(), taskToolbox.getEmitter(), taskToolbox.getQueryProcessingPool(), taskToolbox.getJoinableFactory(), taskToolbox.getCache(), taskToolbox.getCacheConfig(), taskToolbox.getCachePopulatorStats(), this.rowIngestionMeters, this.parseExceptionHandler, isUseMaxMemoryEstimates(), taskToolbox.getCentralizedTableSchemaConfig());
    }

    private static StreamAppenderatorDriver newDriver(DataSchema dataSchema, Appenderator appenderator, TaskToolbox taskToolbox, FireDepartmentMetrics fireDepartmentMetrics, TaskLockType taskLockType) {
        return new StreamAppenderatorDriver(appenderator, new ActionBasedSegmentAllocator(taskToolbox.getTaskActionClient(), dataSchema, (dataSchema2, inputRow, str, str2, z) -> {
            return new SegmentAllocateAction(dataSchema2.getDataSource(), inputRow.getTimestamp(), dataSchema2.getGranularitySpec().getQueryGranularity(), dataSchema2.getGranularitySpec().getSegmentGranularity(), str, str2, z, NumberedPartialShardSpec.instance(), LockGranularity.TIME_CHUNK, taskLockType);
        }), taskToolbox.getSegmentHandoffNotifierFactory(), new ActionBasedUsedSegmentChecker(taskToolbox.getTaskActionClient()), taskToolbox.getDataSegmentKiller(), taskToolbox.getJsonMapper(), fireDepartmentMetrics);
    }

    private static String makeSequenceName(String str, int i) {
        return str + "_" + i;
    }
}
