package org.apache.druid.indexing.seekablestream;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.partition.NumberedShardSpecFactory;
import org.apache.druid.utils.CircularBuffer;
import org.joda.time.DateTime;

/* loaded from: input_file:org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.class */
public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> extends AbstractTask implements ChatHandler {
    public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
    private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTask.class);
    protected final DataSchema dataSchema;
    protected final SeekableStreamIndexTaskTuningConfig tuningConfig;
    protected final SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig;
    protected final Optional<ChatHandlerProvider> chatHandlerProvider;
    protected final Map<String, Object> context;
    protected final AuthorizerMapper authorizerMapper;
    protected final RowIngestionMetersFactory rowIngestionMetersFactory;
    protected final CircularBuffer<Throwable> savedParseExceptions;
    protected final AppenderatorsManager appenderatorsManager;
    protected final LockGranularity lockGranularityToUse;
    private final Supplier<SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType>> runnerSupplier;

    public SeekableStreamIndexTask(String str, @Nullable TaskResource taskResource, DataSchema dataSchema, SeekableStreamIndexTaskTuningConfig seekableStreamIndexTaskTuningConfig, SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> seekableStreamIndexTaskIOConfig, @Nullable Map<String, Object> map, @Nullable ChatHandlerProvider chatHandlerProvider, AuthorizerMapper authorizerMapper, RowIngestionMetersFactory rowIngestionMetersFactory, @Nullable String str2, AppenderatorsManager appenderatorsManager) {
        super(str, str2, taskResource, dataSchema.getDataSource(), map);
        this.dataSchema = (DataSchema) Preconditions.checkNotNull(dataSchema, "dataSchema");
        this.tuningConfig = (SeekableStreamIndexTaskTuningConfig) Preconditions.checkNotNull(seekableStreamIndexTaskTuningConfig, "tuningConfig");
        this.ioConfig = (SeekableStreamIndexTaskIOConfig) Preconditions.checkNotNull(seekableStreamIndexTaskIOConfig, "ioConfig");
        this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
        if (seekableStreamIndexTaskTuningConfig.getMaxSavedParseExceptions() > 0) {
            this.savedParseExceptions = new CircularBuffer<>(seekableStreamIndexTaskTuningConfig.getMaxSavedParseExceptions());
        } else {
            this.savedParseExceptions = null;
        }
        this.context = map;
        this.authorizerMapper = authorizerMapper;
        this.rowIngestionMetersFactory = rowIngestionMetersFactory;
        this.runnerSupplier = Suppliers.memoize(this::createTaskRunner);
        this.appenderatorsManager = appenderatorsManager;
        this.lockGranularityToUse = ((Boolean) getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true)).booleanValue() ? LockGranularity.TIME_CHUNK : LockGranularity.SEGMENT;
    }

    private static String makeTaskId(String str, String str2) {
        return Joiner.on("_").join(str2, str, new Object[]{RandomIdUtils.getRandomId()});
    }

    protected static String getFormattedId(String str, String str2) {
        return makeTaskId(str, str2);
    }

    protected static String getFormattedGroupId(String str, String str2) {
        return StringUtils.format("%s_%s", new Object[]{str2, str});
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public int getPriority() {
        return ((Integer) getContextValue(Tasks.PRIORITY_KEY, 75)).intValue();
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public boolean isReady(TaskActionClient taskActionClient) {
        return true;
    }

    @JsonProperty
    public DataSchema getDataSchema() {
        return this.dataSchema;
    }

    @JsonProperty
    public SeekableStreamIndexTaskTuningConfig getTuningConfig() {
        return this.tuningConfig;
    }

    @JsonProperty("ioConfig")
    public SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> getIOConfig() {
        return this.ioConfig;
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public TaskStatus run(TaskToolbox taskToolbox) {
        return getRunner().run(taskToolbox);
    }

    @Override // org.apache.druid.indexing.common.task.AbstractTask, org.apache.druid.indexing.common.task.Task
    public boolean canRestore() {
        return true;
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public void stopGracefully(TaskConfig taskConfig) {
        if (taskConfig.isRestoreTasksOnRestart()) {
            getRunner().stopGracefully();
        } else {
            getRunner().stopForcefully();
        }
    }

    @Override // org.apache.druid.indexing.common.task.AbstractTask, org.apache.druid.indexing.common.task.Task
    public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
        return getRunner().getAppenderator() == null ? new NoopQueryRunner() : (queryPlus, responseContext) -> {
            return queryPlus.run(getRunner().getAppenderator(), responseContext);
        };
    }

    public Appenderator newAppenderator(FireDepartmentMetrics fireDepartmentMetrics, TaskToolbox taskToolbox) {
        return this.appenderatorsManager.createRealtimeAppenderatorForTask(getId(), this.dataSchema, this.tuningConfig.m89withBasePersistDirectory(taskToolbox.getPersistDir()), fireDepartmentMetrics, taskToolbox.getSegmentPusher(), taskToolbox.getObjectMapper(), taskToolbox.getIndexIO(), taskToolbox.getIndexMergerV9(), taskToolbox.getQueryRunnerFactoryConglomerate(), taskToolbox.getSegmentAnnouncer(), taskToolbox.getEmitter(), taskToolbox.getQueryExecutorService(), taskToolbox.getCache(), taskToolbox.getCacheConfig(), taskToolbox.getCachePopulatorStats());
    }

    public StreamAppenderatorDriver newDriver(Appenderator appenderator, TaskToolbox taskToolbox, FireDepartmentMetrics fireDepartmentMetrics) {
        return new StreamAppenderatorDriver(appenderator, new ActionBasedSegmentAllocator(taskToolbox.getTaskActionClient(), this.dataSchema, (dataSchema, inputRow, str, str2, z) -> {
            return new SegmentAllocateAction(dataSchema.getDataSource(), inputRow.getTimestamp(), dataSchema.getGranularitySpec().getQueryGranularity(), dataSchema.getGranularitySpec().getSegmentGranularity(), str, str2, z, NumberedShardSpecFactory.instance(), this.lockGranularityToUse);
        }), taskToolbox.getSegmentHandoffNotifierFactory(), new ActionBasedUsedSegmentChecker(taskToolbox.getTaskActionClient()), taskToolbox.getDataSegmentKiller(), taskToolbox.getObjectMapper(), fireDepartmentMetrics);
    }

    public boolean withinMinMaxRecordTime(InputRow inputRow) {
        boolean z = this.ioConfig.getMinimumMessageTime().isPresent() && ((DateTime) this.ioConfig.getMinimumMessageTime().get()).isAfter(inputRow.getTimestamp());
        boolean z2 = this.ioConfig.getMaximumMessageTime().isPresent() && ((DateTime) this.ioConfig.getMaximumMessageTime().get()).isBefore(inputRow.getTimestamp());
        if (!Intervals.ETERNITY.contains(inputRow.getTimestamp())) {
            throw new ParseException(StringUtils.format("Encountered row with timestamp that cannot be represented as a long: [%s]", new Object[]{inputRow}), new Object[0]);
        }
        if (log.isDebugEnabled()) {
            if (z) {
                log.debug("CurrentTimeStamp[%s] is before MinimumMessageTime[%s]", new Object[]{inputRow.getTimestamp(), this.ioConfig.getMinimumMessageTime().get()});
            } else if (z2) {
                log.debug("CurrentTimeStamp[%s] is after MaximumMessageTime[%s]", new Object[]{inputRow.getTimestamp(), this.ioConfig.getMaximumMessageTime().get()});
            }
        }
        return (z || z2) ? false : true;
    }

    protected abstract SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> createTaskRunner();

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract RecordSupplier<PartitionIdType, SequenceOffsetType> newTaskRecordSupplier();

    @VisibleForTesting
    public Appenderator getAppenderator() {
        return getRunner().getAppenderator();
    }

    @VisibleForTesting
    public SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> getRunner() {
        return (SeekableStreamIndexTaskRunner) this.runnerSupplier.get();
    }
}
