package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.BatchAppenderators;
import org.apache.druid.indexing.common.task.FilteringCloseableInputRowIterator;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SegmentAllocators;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.DataSegmentsWithSchemas;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.plumber.PlumberSchool;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.class */
public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler {
    public static final String TYPE = "single_phase_sub_task";
    public static final String OLD_TYPE_NAME = "index_sub";
    private static final Logger LOG = new Logger(SinglePhaseSubTask.class);
    private final int numAttempts;
    private final ParallelIndexIngestionSpec ingestionSchema;
    private final String subtaskSpecId;
    private final boolean missingIntervalsInOverwriteMode;
    private AuthorizerMapper authorizerMapper;
    private RowIngestionMeters rowIngestionMeters;
    private ParseExceptionHandler parseExceptionHandler;

    @Nullable
    private String errorMsg;
    private IngestionState ingestionState;

    @JsonCreator
    public SinglePhaseSubTask(@JsonProperty("id") @Nullable String str, @JsonProperty("groupId") String str2, @JsonProperty("resource") TaskResource taskResource, @JsonProperty("supervisorTaskId") String str3, @JsonProperty("subtaskSpecId") @Nullable String str4, @JsonProperty("numAttempts") int i, @JsonProperty("spec") ParallelIndexIngestionSpec parallelIndexIngestionSpec, @JsonProperty("context") Map<String, Object> map) {
        super(getOrMakeId(str, TYPE, parallelIndexIngestionSpec.getDataSchema().getDataSource()), str2, taskResource, parallelIndexIngestionSpec.getDataSchema().getDataSource(), map, AbstractTask.computeBatchIngestionMode(parallelIndexIngestionSpec.m44getIOConfig()), str3);
        if (parallelIndexIngestionSpec.m43getTuningConfig().isForceGuaranteedRollup()) {
            throw new UnsupportedOperationException("Guaranteed rollup is not supported");
        }
        this.subtaskSpecId = str4;
        this.numAttempts = i;
        this.ingestionSchema = parallelIndexIngestionSpec;
        this.missingIntervalsInOverwriteMode = !parallelIndexIngestionSpec.m44getIOConfig().isAppendToExisting() && parallelIndexIngestionSpec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
        if (this.missingIntervalsInOverwriteMode) {
            addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
        }
        this.ingestionState = IngestionState.NOT_STARTED;
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public String getType() {
        return TYPE;
    }

    @Override // org.apache.druid.indexing.common.task.Task
    @Nonnull
    @JsonIgnore
    public Set<ResourceAction> getInputSourceResources() {
        if (getIngestionSchema().m44getIOConfig().getFirehoseFactory() != null) {
            throw getInputSecurityOnFirehoseUnsupportedError();
        }
        return getIngestionSchema().m44getIOConfig().getInputSource() != null ? (Set) getIngestionSchema().m44getIOConfig().getInputSource().getTypes().stream().map(str -> {
            return new ResourceAction(new Resource(str, "EXTERNAL"), Action.READ);
        }).collect(Collectors.toSet()) : ImmutableSet.of();
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public boolean isReady(TaskActionClient taskActionClient) throws IOException {
        return determineLockGranularityAndTryLock(new SurrogateTaskActionClient(getSupervisorTaskId(), taskActionClient), this.ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals());
    }

    @JsonProperty
    public int getNumAttempts() {
        return this.numAttempts;
    }

    @JsonProperty("spec")
    public ParallelIndexIngestionSpec getIngestionSchema() {
        return this.ingestionSchema;
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask
    @JsonProperty
    public String getSubtaskSpecId() {
        return this.subtaskSpecId;
    }

    @Override // org.apache.druid.indexing.common.task.AbstractTask
    public TaskStatus runTask(TaskToolbox taskToolbox) throws Exception {
        try {
            try {
                if (this.missingIntervalsInOverwriteMode) {
                    LOG.warn("Intervals are missing in granularitySpec while this task is potentially overwriting existing segments. Forced to use timeChunk lock.", new Object[0]);
                }
                this.authorizerMapper = taskToolbox.getAuthorizerMapper();
                taskToolbox.getChatHandlerProvider().register(getId(), this, false);
                this.rowIngestionMeters = taskToolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
                this.parseExceptionHandler = new ParseExceptionHandler(this.rowIngestionMeters, this.ingestionSchema.m43getTuningConfig().isLogParseExceptions(), this.ingestionSchema.m43getTuningConfig().getMaxParseExceptions(), this.ingestionSchema.m43getTuningConfig().getMaxSavedParseExceptions());
                InputSource nonNullInputSource = this.ingestionSchema.m44getIOConfig().getNonNullInputSource(taskToolbox);
                ParallelIndexSupervisorTaskClient build = taskToolbox.getSupervisorTaskClientProvider().build(getSupervisorTaskId(), this.ingestionSchema.m43getTuningConfig().getChatHandlerTimeout(), this.ingestionSchema.m43getTuningConfig().getChatHandlerNumRetries());
                this.ingestionState = IngestionState.BUILD_SEGMENTS;
                DataSegmentsWithSchemas generateAndPushSegments = generateAndPushSegments(taskToolbox, build, nonNullInputSource, taskToolbox.getIndexingTmpDir());
                HashSet hashSet = new HashSet(getTaskLockHelper().getLockedExistingSegments());
                hashSet.addAll(generateAndPushSegments.getSegments());
                ImmutableSet set = FluentIterable.from(SegmentTimeline.forSegments(hashSet).findFullyOvershadowed()).transformAndConcat((v0) -> {
                    return v0.getObject();
                }).transform((v0) -> {
                    return v0.getObject();
                }).toSet();
                TaskReport.ReportMap taskCompletionReports = getTaskCompletionReports();
                build.report(new PushedSegmentsReport(getId(), set, generateAndPushSegments.getSegments(), taskCompletionReports, generateAndPushSegments.getSegmentSchemaMapping()));
                taskToolbox.getTaskReportFileWriter().write(getId(), taskCompletionReports);
                TaskStatus success = TaskStatus.success(getId());
                taskToolbox.getChatHandlerProvider().unregister(getId());
                return success;
            } catch (Exception e) {
                LOG.error(e, "Encountered exception in parallel sub task.", new Object[0]);
                this.errorMsg = Throwables.getStackTraceAsString(e);
                taskToolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
                TaskStatus failure = TaskStatus.failure(getId(), this.errorMsg);
                taskToolbox.getChatHandlerProvider().unregister(getId());
                return failure;
            }
        } catch (Throwable th) {
            taskToolbox.getChatHandlerProvider().unregister(getId());
            throw th;
        }
    }

    @Override // org.apache.druid.indexing.common.task.AbstractBatchIndexTask
    public boolean requireLockExistingSegments() {
        return getIngestionMode() != AbstractTask.IngestionMode.APPEND;
    }

    @Override // org.apache.druid.indexing.common.task.AbstractBatchIndexTask
    public List<DataSegment> findSegmentsToLock(TaskActionClient taskActionClient, List<Interval> list) throws IOException {
        return findInputSegments(getDataSource(), taskActionClient, list);
    }

    @Override // org.apache.druid.indexing.common.task.AbstractBatchIndexTask
    public boolean isPerfectRollup() {
        return false;
    }

    @Override // org.apache.druid.indexing.common.task.AbstractBatchIndexTask
    @Nullable
    public Granularity getSegmentGranularity() {
        GranularitySpec granularitySpec = this.ingestionSchema.getDataSchema().getGranularitySpec();
        if (granularitySpec instanceof ArbitraryGranularitySpec) {
            return null;
        }
        return granularitySpec.getSegmentGranularity();
    }

    private DataSegmentsWithSchemas generateAndPushSegments(TaskToolbox taskToolbox, ParallelIndexSupervisorTaskClient parallelIndexSupervisorTaskClient, InputSource inputSource, File file) throws IOException, InterruptedException {
        DataSchema dataSchema = this.ingestionSchema.getDataSchema();
        GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
        FireDepartment fireDepartment = new FireDepartment(dataSchema, new RealtimeIOConfig((FirehoseFactory) null, (PlumberSchool) null), (RealtimeTuningConfig) null);
        FireDepartmentMetrics metrics = fireDepartment.getMetrics();
        TaskRealtimeMetricsMonitor build = TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartment, this.rowIngestionMeters);
        taskToolbox.addMonitor(build);
        ParallelIndexTuningConfig m43getTuningConfig = this.ingestionSchema.m43getTuningConfig();
        DynamicPartitionsSpec givenOrDefaultPartitionsSpec = m43getTuningConfig.getGivenOrDefaultPartitionsSpec();
        long pushTimeout = m43getTuningConfig.getPushTimeout();
        boolean booleanValue = ((Boolean) getContextValue(SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY, false)).booleanValue();
        String id = booleanValue ? (String) Preconditions.checkNotNull(this.subtaskSpecId, "subtaskSpecId") : getId();
        SegmentAllocatorForBatch forLinearPartitioning = SegmentAllocators.forLinearPartitioning(taskToolbox, id, new SupervisorTaskAccess(getSupervisorTaskId(), parallelIndexSupervisorTaskClient), getIngestionSchema().getDataSchema(), getTaskLockHelper(), getIngestionMode(), givenOrDefaultPartitionsSpec, Boolean.valueOf(booleanValue));
        Appenderator newAppenderator = BatchAppenderators.newAppenderator(getId(), taskToolbox.getAppenderatorsManager(), metrics, taskToolbox, dataSchema, m43getTuningConfig, this.rowIngestionMeters, this.parseExceptionHandler, ((Boolean) getContextValue(Tasks.USE_MAX_MEMORY_ESTIMATES, false)).booleanValue());
        try {
            try {
                try {
                    BatchAppenderatorDriver newDriver = BatchAppenderators.newDriver(newAppenderator, taskToolbox, forLinearPartitioning);
                    try {
                        FilteringCloseableInputRowIterator inputSourceReader = AbstractBatchIndexTask.inputSourceReader(file, dataSchema, inputSource, inputSource.needsFormat() ? ParallelIndexSupervisorTask.getInputFormat(this.ingestionSchema) : null, allowNonNullRowsWithinInputIntervalsOf(granularitySpec), this.rowIngestionMeters, this.parseExceptionHandler);
                        try {
                            newDriver.startJob();
                            HashSet hashSet = new HashSet();
                            SegmentSchemaMapping segmentSchemaMapping = new SegmentSchemaMapping(1);
                            while (inputSourceReader.hasNext()) {
                                InputRow inputRow = (InputRow) inputSourceReader.next();
                                AppenderatorDriverAddResult add = newDriver.add(inputRow, id);
                                if (!add.isOk()) {
                                    throw new ISE("Failed to add a row with timestamp[%s]", new Object[]{inputRow.getTimestamp()});
                                }
                                if (add.isPushRequired(givenOrDefaultPartitionsSpec.getMaxRowsPerSegment(), Long.valueOf(givenOrDefaultPartitionsSpec.getMaxTotalRowsOr(20000000L)))) {
                                    SegmentsAndCommitMetadata pushAllAndClear = newDriver.pushAllAndClear(pushTimeout);
                                    hashSet.addAll(pushAllAndClear.getSegments());
                                    segmentSchemaMapping.merge(pushAllAndClear.getSegmentSchemaMapping());
                                    LOG.info("Pushed [%s] segments and [%s] schemas", new Object[]{Integer.valueOf(pushAllAndClear.getSegments().size()), Integer.valueOf(segmentSchemaMapping.getSchemaCount())});
                                    LOG.infoSegments(pushAllAndClear.getSegments(), "Pushed segments");
                                    LOG.info("SegmentSchema is [%s]", new Object[]{segmentSchemaMapping});
                                }
                                metrics.incrementProcessed();
                            }
                            SegmentsAndCommitMetadata pushAllAndClear2 = newDriver.pushAllAndClear(pushTimeout);
                            hashSet.addAll(pushAllAndClear2.getSegments());
                            segmentSchemaMapping.merge(pushAllAndClear2.getSegmentSchemaMapping());
                            LOG.info("Pushed [%s] segments and [%s] schemas", new Object[]{Integer.valueOf(pushAllAndClear2.getSegments().size()), Integer.valueOf(segmentSchemaMapping.getSchemaCount())});
                            LOG.infoSegments(pushAllAndClear2.getSegments(), "Pushed segments");
                            LOG.info("SegmentSchema is [%s]", new Object[]{segmentSchemaMapping});
                            newAppenderator.close();
                            DataSegmentsWithSchemas dataSegmentsWithSchemas = new DataSegmentsWithSchemas(hashSet, segmentSchemaMapping.isNonEmpty() ? segmentSchemaMapping : null);
                            if (inputSourceReader != null) {
                                inputSourceReader.close();
                            }
                            if (newDriver != null) {
                                newDriver.close();
                            }
                            return dataSegmentsWithSchemas;
                        } catch (Throwable th) {
                            if (inputSourceReader != null) {
                                try {
                                    inputSourceReader.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    } catch (Throwable th3) {
                        if (newDriver != null) {
                            try {
                                newDriver.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        }
                        throw th3;
                    }
                } catch (ExecutionException | TimeoutException e) {
                    throw new RuntimeException(e);
                }
            } catch (Exception e2) {
                throw e2;
            }
        } finally {
            if (0 != 0) {
                newAppenderator.closeNow();
            } else {
                newAppenderator.close();
            }
            taskToolbox.removeMonitor(build);
        }
    }

    @GET
    @Produces({"application/json"})
    @Path("/unparseableEvents")
    public Response getUnparseableEvents(@Context HttpServletRequest httpServletRequest, @QueryParam("full") String str) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        HashMap hashMap = new HashMap();
        if (addBuildSegmentStatsToReport(str != null, this.ingestionState)) {
            hashMap.put("buildSegments", IndexTaskUtils.getReportListFromSavedParseExceptions(this.parseExceptionHandler.getSavedParseExceptionReports()));
        }
        return Response.ok(hashMap).build();
    }

    private Map<String, Object> doGetRowStats(boolean z) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        if (addBuildSegmentStatsToReport(z, this.ingestionState)) {
            hashMap2.put("buildSegments", this.rowIngestionMeters.getTotals());
            hashMap3.put("buildSegments", this.rowIngestionMeters.getMovingAverages());
        }
        hashMap.put("totals", hashMap2);
        hashMap.put("movingAverages", hashMap3);
        return hashMap;
    }

    @GET
    @Produces({"application/json"})
    @Path("/rowStats")
    public Response getRowStats(@Context HttpServletRequest httpServletRequest, @QueryParam("full") String str) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        return Response.ok(doGetRowStats(str != null)).build();
    }

    TaskReport.ReportMap doGetLiveReports(boolean z) {
        return buildLiveIngestionStatsReport(this.ingestionState, getTaskCompletionUnparseableEvents(), doGetRowStats(z));
    }

    @GET
    @Produces({"application/json"})
    @Path("/liveReports")
    public Response getLiveReports(@Context HttpServletRequest httpServletRequest, @QueryParam("full") String str) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        return Response.ok(doGetLiveReports(str != null)).build();
    }

    @Override // org.apache.druid.indexing.common.task.AbstractBatchIndexTask
    protected Map<String, Object> getTaskCompletionRowStats() {
        return Collections.singletonMap("buildSegments", this.rowIngestionMeters.getTotals());
    }

    private TaskReport.ReportMap getTaskCompletionReports() {
        return buildIngestionStatsReport(IngestionState.COMPLETED, this.errorMsg, null, null);
    }

    @Override // org.apache.druid.indexing.common.task.AbstractBatchIndexTask
    protected Map<String, Object> getTaskCompletionUnparseableEvents() {
        HashMap hashMap = new HashMap();
        List<ParseExceptionReport> reportListFromSavedParseExceptions = IndexTaskUtils.getReportListFromSavedParseExceptions(this.parseExceptionHandler.getSavedParseExceptionReports());
        if (reportListFromSavedParseExceptions != null) {
            hashMap.put("buildSegments", reportListFromSavedParseExceptions);
        } else {
            hashMap.put("buildSegments", ImmutableList.of());
        }
        return hashMap;
    }
}
