package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.LockTryAcquireAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.ChatHandlers;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.class */
public class ParallelIndexSupervisorTask extends AbstractTask implements ChatHandler {
    public static final String TYPE = "index_parallel";
    private static final Logger log = new Logger(ParallelIndexSupervisorTask.class);
    private final ParallelIndexIngestionSpec ingestionSchema;
    private final FiniteFirehoseFactory<?, ?> baseFirehoseFactory;
    private final IndexingServiceClient indexingServiceClient;
    private final ChatHandlerProvider chatHandlerProvider;
    private final AuthorizerMapper authorizerMapper;
    private final RowIngestionMetersFactory rowIngestionMetersFactory;
    private final ConcurrentHashMap<Interval, AtomicInteger> partitionNumCountersPerInterval;
    private volatile ParallelIndexTaskRunner runner;
    private volatile TaskToolbox toolbox;

    @JsonCreator
    public ParallelIndexSupervisorTask(@JsonProperty("id") String str, @JsonProperty("resource") TaskResource taskResource, @JsonProperty("spec") ParallelIndexIngestionSpec parallelIndexIngestionSpec, @JsonProperty("context") Map<String, Object> map, @Nullable @JacksonInject IndexingServiceClient indexingServiceClient, @Nullable @JacksonInject ChatHandlerProvider chatHandlerProvider, @JacksonInject AuthorizerMapper authorizerMapper, @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory) {
        super(getOrMakeId(str, TYPE, parallelIndexIngestionSpec.getDataSchema().getDataSource()), null, taskResource, parallelIndexIngestionSpec.getDataSchema().getDataSource(), map);
        this.partitionNumCountersPerInterval = new ConcurrentHashMap<>();
        this.ingestionSchema = parallelIndexIngestionSpec;
        FiniteFirehoseFactory<?, ?> firehoseFactory = parallelIndexIngestionSpec.m33getIOConfig().getFirehoseFactory();
        if (!(firehoseFactory instanceof FiniteFirehoseFactory)) {
            throw new IAE("[%s] should implement FiniteFirehoseFactory", new Object[]{firehoseFactory.getClass().getSimpleName()});
        }
        this.baseFirehoseFactory = firehoseFactory;
        this.indexingServiceClient = indexingServiceClient;
        this.chatHandlerProvider = chatHandlerProvider;
        this.authorizerMapper = authorizerMapper;
        this.rowIngestionMetersFactory = rowIngestionMetersFactory;
        if (parallelIndexIngestionSpec.m32getTuningConfig().getMaxSavedParseExceptions() != 0) {
            log.warn("maxSavedParseExceptions is not supported yet", new Object[0]);
        }
        if (parallelIndexIngestionSpec.m32getTuningConfig().getMaxParseExceptions() != Integer.MAX_VALUE) {
            log.warn("maxParseExceptions is not supported yet", new Object[0]);
        }
        if (parallelIndexIngestionSpec.m32getTuningConfig().isLogParseExceptions()) {
            log.warn("logParseExceptions is not supported yet", new Object[0]);
        }
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public int getPriority() {
        return ((Integer) getContextValue(Tasks.PRIORITY_KEY, 50)).intValue();
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public String getType() {
        return TYPE;
    }

    @JsonProperty("spec")
    public ParallelIndexIngestionSpec getIngestionSchema() {
        return this.ingestionSchema;
    }

    @VisibleForTesting
    @Nullable
    ParallelIndexTaskRunner getRunner() {
        return this.runner;
    }

    @VisibleForTesting
    AuthorizerMapper getAuthorizerMapper() {
        return this.authorizerMapper;
    }

    @VisibleForTesting
    ParallelIndexTaskRunner createRunner(TaskToolbox taskToolbox) {
        if (this.ingestionSchema.m32getTuningConfig().isForceGuaranteedRollup()) {
            throw new UnsupportedOperationException("Perfect roll-up is not supported yet");
        }
        this.runner = new SinglePhaseParallelIndexTaskRunner(taskToolbox, getId(), getGroupId(), this.ingestionSchema, getContext(), this.indexingServiceClient);
        return this.runner;
    }

    @VisibleForTesting
    void setRunner(ParallelIndexTaskRunner parallelIndexTaskRunner) {
        this.runner = parallelIndexTaskRunner;
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public boolean isReady(TaskActionClient taskActionClient) throws Exception {
        Optional bucketIntervals = this.ingestionSchema.getDataSchema().getGranularitySpec().bucketIntervals();
        return !bucketIntervals.isPresent() || isReady(taskActionClient, (SortedSet) bucketIntervals.get());
    }

    static boolean isReady(TaskActionClient taskActionClient, SortedSet<Interval> sortedSet) throws IOException {
        if (!getTaskLocks(taskActionClient).isEmpty()) {
            return true;
        }
        try {
            Tasks.tryAcquireExclusiveLocks(taskActionClient, sortedSet);
            return true;
        } catch (Exception e) {
            log.error(e, "Failed to acquire locks for intervals[%s]", new Object[]{sortedSet});
            return false;
        }
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public TaskStatus run(TaskToolbox taskToolbox) throws Exception {
        setToolbox(taskToolbox);
        log.info("Found chat handler of class[%s]", new Object[]{((ChatHandlerProvider) Preconditions.checkNotNull(this.chatHandlerProvider, "chatHandlerProvider")).getClass().getName()});
        this.chatHandlerProvider.register(getId(), this, false);
        try {
            if (isParallelMode()) {
                return runParallel(taskToolbox);
            }
            if (!this.baseFirehoseFactory.isSplittable()) {
                log.warn("firehoseFactory[%s] is not splittable. Running sequentially.", new Object[]{this.baseFirehoseFactory.getClass().getSimpleName()});
            } else {
                if (this.ingestionSchema.m32getTuningConfig().getMaxNumSubTasks() != 1) {
                    throw new ISE("Unknown reason for sequentail mode. Failing this task.", new Object[0]);
                }
                log.warn("maxNumSubTasks is 1. Running sequentially. Please set maxNumSubTasks to something higher than 1 if you want to run in parallel ingestion mode.", new Object[0]);
            }
            return runSequential(taskToolbox);
        } finally {
            this.chatHandlerProvider.unregister(getId());
        }
    }

    private boolean isParallelMode() {
        return this.baseFirehoseFactory.isSplittable() && this.ingestionSchema.m32getTuningConfig().getMaxNumSubTasks() > 1;
    }

    @VisibleForTesting
    void setToolbox(TaskToolbox taskToolbox) {
        this.toolbox = taskToolbox;
    }

    private TaskStatus runParallel(TaskToolbox taskToolbox) throws Exception {
        createRunner(taskToolbox);
        return TaskStatus.fromCode(getId(), ((ParallelIndexTaskRunner) Preconditions.checkNotNull(this.runner, "runner")).run());
    }

    private TaskStatus runSequential(TaskToolbox taskToolbox) {
        return new IndexTask(getId(), getGroupId(), getTaskResource(), getDataSource(), new IndexTask.IndexIngestionSpec(getIngestionSchema().getDataSchema(), getIngestionSchema().m33getIOConfig(), convertToIndexTuningConfig(getIngestionSchema().m32getTuningConfig())), getContext(), this.authorizerMapper, this.chatHandlerProvider, this.rowIngestionMetersFactory).run(taskToolbox);
    }

    private static IndexTask.IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningConfig parallelIndexTuningConfig) {
        return new IndexTask.IndexTuningConfig(null, parallelIndexTuningConfig.getMaxRowsPerSegment(), Integer.valueOf(parallelIndexTuningConfig.getMaxRowsInMemory()), Long.valueOf(parallelIndexTuningConfig.getMaxBytesInMemory()), parallelIndexTuningConfig.getMaxTotalRows(), null, parallelIndexTuningConfig.getNumShards(), null, parallelIndexTuningConfig.getIndexSpec(), Integer.valueOf(parallelIndexTuningConfig.getMaxPendingPersists()), true, Boolean.valueOf(parallelIndexTuningConfig.isForceExtendableShardSpecs()), false, Boolean.valueOf(parallelIndexTuningConfig.isReportParseExceptions()), null, Long.valueOf(parallelIndexTuningConfig.getPushTimeout()), parallelIndexTuningConfig.getSegmentWriteOutMediumFactory(), Boolean.valueOf(parallelIndexTuningConfig.isLogParseExceptions()), Integer.valueOf(parallelIndexTuningConfig.getMaxParseExceptions()), Integer.valueOf(parallelIndexTuningConfig.getMaxSavedParseExceptions()));
    }

    @POST
    @Produces({"application/x-jackson-smile"})
    @Path("/segment/allocate")
    public Response allocateSegment(DateTime dateTime, @Context HttpServletRequest httpServletRequest) {
        ChatHandlers.authorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        if (this.toolbox == null) {
            return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
        }
        try {
            return Response.ok(this.toolbox.getObjectMapper().writeValueAsBytes(allocateNewSegment(dateTime))).build();
        } catch (IOException | IllegalStateException e) {
            return Response.serverError().entity(Throwables.getStackTraceAsString(e)).build();
        } catch (IllegalArgumentException e2) {
            return Response.status(Response.Status.BAD_REQUEST).entity(Throwables.getStackTraceAsString(e2)).build();
        }
    }

    @VisibleForTesting
    SegmentIdWithShardSpec allocateNewSegment(DateTime dateTime) throws IOException {
        Interval bucket;
        String findVersion;
        String dataSource = getDataSource();
        GranularitySpec granularitySpec = getIngestionSchema().getDataSchema().getGranularitySpec();
        Optional bucketIntervals = granularitySpec.bucketIntervals();
        List list = (List) this.toolbox.getTaskActionClient().submit(new LockListAction());
        TaskLock taskLock = (TaskLock) list.stream().filter((v0) -> {
            return v0.isRevoked();
        }).findAny().orElse(null);
        if (taskLock != null) {
            throw new ISE("Lock revoked: [%s]", new Object[]{taskLock});
        }
        Map map = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getInterval();
        }, (v0) -> {
            return v0.getVersion();
        }));
        boolean z = false;
        if (bucketIntervals.isPresent()) {
            Optional bucketInterval = granularitySpec.bucketInterval(dateTime);
            if (!bucketInterval.isPresent()) {
                throw new IAE("Could not find interval for timestamp [%s]", new Object[]{dateTime});
            }
            bucket = (Interval) bucketInterval.get();
            if (!((SortedSet) bucketIntervals.get()).contains(bucket)) {
                throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", new Object[]{bucket, granularitySpec});
            }
            findVersion = findVersion(map, bucket);
            if (findVersion == null) {
                throw new ISE("Cannot find a version for interval[%s]", new Object[]{bucket});
            }
        } else {
            bucket = granularitySpec.getSegmentGranularity().bucket(dateTime);
            findVersion = findVersion(map, bucket);
            if (findVersion == null) {
                findVersion = ((TaskLock) Preconditions.checkNotNull(this.toolbox.getTaskActionClient().submit(new LockTryAcquireAction(TaskLockType.EXCLUSIVE, bucket)), "Cannot acquire a lock for interval[%s]", new Object[]{bucket})).getVersion();
                z = true;
            }
        }
        int andIncrementInt = Counters.getAndIncrementInt(this.partitionNumCountersPerInterval, bucket);
        if (!z || andIncrementInt == 0) {
            return new SegmentIdWithShardSpec(dataSource, bucket, findVersion, new NumberedShardSpec(andIncrementInt, 0));
        }
        throw new ISE("Expected partitionNum to be 0 for interval [%s] right after locking, but got [%s]", new Object[]{bucket, Integer.valueOf(andIncrementInt)});
    }

    @Nullable
    private static String findVersion(Map<Interval, String> map, Interval interval) {
        return (String) map.entrySet().stream().filter(entry -> {
            return ((Interval) entry.getKey()).contains(interval);
        }).map((v0) -> {
            return v0.getValue();
        }).findFirst().orElse(null);
    }

    @POST
    @Path("/report")
    @Consumes({"application/x-jackson-smile"})
    public Response report(PushedSegmentsReport pushedSegmentsReport, @Context HttpServletRequest httpServletRequest) {
        ChatHandlers.authorizationCheck(httpServletRequest, Action.WRITE, getDataSource(), this.authorizerMapper);
        if (this.runner == null) {
            return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
        }
        this.runner.collectReport(pushedSegmentsReport);
        return Response.ok().build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/mode")
    public Response getMode(@Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        return Response.ok(isParallelMode() ? "parallel" : "sequential").build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/progress")
    public Response getProgress(@Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        return this.runner == null ? Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build() : Response.ok(this.runner.getProgress()).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/subtasks/running")
    public Response getRunningTasks(@Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        return this.runner == null ? Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build() : Response.ok(this.runner.getRunningTaskIds()).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/subtaskspecs")
    public Response getSubTaskSpecs(@Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        return this.runner == null ? Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build() : Response.ok(this.runner.getSubTaskSpecs()).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/subtaskspecs/running")
    public Response getRunningSubTaskSpecs(@Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        return this.runner == null ? Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build() : Response.ok(this.runner.getRunningSubTaskSpecs()).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/subtaskspecs/complete")
    public Response getCompleteSubTaskSpecs(@Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        return this.runner == null ? Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build() : Response.ok(this.runner.getCompleteSubTaskSpecs()).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/subtaskspec/{id}")
    public Response getSubTaskSpec(@PathParam("id") String str, @Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        if (this.runner == null) {
            return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
        }
        SubTaskSpec subTaskSpec = this.runner.getSubTaskSpec(str);
        return subTaskSpec == null ? Response.status(Response.Status.NOT_FOUND).build() : Response.ok(subTaskSpec).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/subtaskspec/{id}/state")
    public Response getSubTaskState(@PathParam("id") String str, @Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        if (this.runner == null) {
            return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
        }
        ParallelIndexTaskRunner.SubTaskSpecStatus subTaskState = this.runner.getSubTaskState(str);
        return subTaskState == null ? Response.status(Response.Status.NOT_FOUND).build() : Response.ok(subTaskState).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/subtaskspec/{id}/history")
    public Response getCompleteSubTaskSpecAttemptHistory(@PathParam("id") String str, @Context HttpServletRequest httpServletRequest) {
        IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, Action.READ, getDataSource(), this.authorizerMapper);
        if (this.runner == null) {
            return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
        }
        TaskHistory completeSubTaskSpecAttemptHistory = this.runner.getCompleteSubTaskSpecAttemptHistory(str);
        return completeSubTaskSpecAttemptHistory == null ? Response.status(Response.Status.NOT_FOUND).build() : Response.ok(completeSubTaskSpecAttemptHistory.getAttemptHistory()).build();
    }
}
