/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.LockTryAcquireAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.PushedSegmentsReport;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskSpec;
import org.apache.druid.indexing.common.task.batch.parallel.TaskHistory;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.ChatHandlers;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.ReadableInterval;

public class ParallelIndexSupervisorTask
extends AbstractTask
implements ChatHandler {
    public static final String TYPE = "index_parallel";
    private static final Logger log = new Logger(ParallelIndexSupervisorTask.class);
    private final ParallelIndexIngestionSpec ingestionSchema;
    private final FiniteFirehoseFactory<?, ?> baseFirehoseFactory;
    private final IndexingServiceClient indexingServiceClient;
    private final ChatHandlerProvider chatHandlerProvider;
    private final AuthorizerMapper authorizerMapper;
    private final RowIngestionMetersFactory rowIngestionMetersFactory;
    private final ConcurrentHashMap<Interval, AtomicInteger> partitionNumCountersPerInterval = new ConcurrentHashMap();
    private volatile ParallelIndexTaskRunner runner;
    private volatile TaskToolbox toolbox;

    @JsonCreator
    public ParallelIndexSupervisorTask(@JsonProperty(value="id") String id, @JsonProperty(value="resource") TaskResource taskResource, @JsonProperty(value="spec") ParallelIndexIngestionSpec ingestionSchema, @JsonProperty(value="context") Map<String, Object> context, @JacksonInject @Nullable IndexingServiceClient indexingServiceClient, @JacksonInject @Nullable ChatHandlerProvider chatHandlerProvider, @JacksonInject AuthorizerMapper authorizerMapper, @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory) {
        super(ParallelIndexSupervisorTask.getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()), null, taskResource, ingestionSchema.getDataSchema().getDataSource(), context);
        this.ingestionSchema = ingestionSchema;
        FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
        if (!(firehoseFactory instanceof FiniteFirehoseFactory)) {
            throw new IAE("[%s] should implement FiniteFirehoseFactory", new Object[]{firehoseFactory.getClass().getSimpleName()});
        }
        this.baseFirehoseFactory = (FiniteFirehoseFactory)firehoseFactory;
        this.indexingServiceClient = indexingServiceClient;
        this.chatHandlerProvider = chatHandlerProvider;
        this.authorizerMapper = authorizerMapper;
        this.rowIngestionMetersFactory = rowIngestionMetersFactory;
        if (ingestionSchema.getTuningConfig().getMaxSavedParseExceptions() != 0) {
            log.warn("maxSavedParseExceptions is not supported yet", new Object[0]);
        }
        if (ingestionSchema.getTuningConfig().getMaxParseExceptions() != Integer.MAX_VALUE) {
            log.warn("maxParseExceptions is not supported yet", new Object[0]);
        }
        if (ingestionSchema.getTuningConfig().isLogParseExceptions()) {
            log.warn("logParseExceptions is not supported yet", new Object[0]);
        }
    }

    @Override
    public int getPriority() {
        return this.getContextValue("priority", 50);
    }

    @Override
    public String getType() {
        return TYPE;
    }

    @JsonProperty(value="spec")
    public ParallelIndexIngestionSpec getIngestionSchema() {
        return this.ingestionSchema;
    }

    @Nullable
    @VisibleForTesting
    ParallelIndexTaskRunner getRunner() {
        return this.runner;
    }

    @VisibleForTesting
    AuthorizerMapper getAuthorizerMapper() {
        return this.authorizerMapper;
    }

    @VisibleForTesting
    ParallelIndexTaskRunner createRunner(TaskToolbox toolbox) {
        if (this.ingestionSchema.getTuningConfig().isForceGuaranteedRollup()) {
            throw new UnsupportedOperationException("Perfect roll-up is not supported yet");
        }
        this.runner = new SinglePhaseParallelIndexTaskRunner(toolbox, this.getId(), this.getGroupId(), this.ingestionSchema, this.getContext(), this.indexingServiceClient);
        return this.runner;
    }

    @VisibleForTesting
    void setRunner(ParallelIndexTaskRunner runner) {
        this.runner = runner;
    }

    @Override
    public boolean isReady(TaskActionClient taskActionClient) throws Exception {
        Optional intervals = this.ingestionSchema.getDataSchema().getGranularitySpec().bucketIntervals();
        return !intervals.isPresent() || ParallelIndexSupervisorTask.isReady(taskActionClient, (SortedSet)intervals.get());
    }

    static boolean isReady(TaskActionClient actionClient, SortedSet<Interval> intervals) throws IOException {
        List<TaskLock> locks = ParallelIndexSupervisorTask.getTaskLocks(actionClient);
        if (locks.isEmpty()) {
            try {
                Tasks.tryAcquireExclusiveLocks(actionClient, intervals);
            }
            catch (Exception e) {
                log.error((Throwable)e, "Failed to acquire locks for intervals[%s]", new Object[]{intervals});
                return false;
            }
        }
        return true;
    }

    @Override
    public TaskStatus run(TaskToolbox toolbox) throws Exception {
        this.setToolbox(toolbox);
        log.info("Found chat handler of class[%s]", new Object[]{((ChatHandlerProvider)Preconditions.checkNotNull((Object)this.chatHandlerProvider, (Object)"chatHandlerProvider")).getClass().getName()});
        this.chatHandlerProvider.register(this.getId(), (ChatHandler)this, false);
        try {
            if (this.isParallelMode()) {
                TaskStatus taskStatus = this.runParallel(toolbox);
                return taskStatus;
            }
            if (!this.baseFirehoseFactory.isSplittable()) {
                log.warn("firehoseFactory[%s] is not splittable. Running sequentially.", new Object[]{this.baseFirehoseFactory.getClass().getSimpleName()});
            } else if (this.ingestionSchema.getTuningConfig().getMaxNumSubTasks() == 1) {
                log.warn("maxNumSubTasks is 1. Running sequentially. Please set maxNumSubTasks to something higher than 1 if you want to run in parallel ingestion mode.", new Object[0]);
            } else {
                throw new ISE("Unknown reason for sequentail mode. Failing this task.", new Object[0]);
            }
            TaskStatus taskStatus = this.runSequential(toolbox);
            return taskStatus;
        }
        finally {
            this.chatHandlerProvider.unregister(this.getId());
        }
    }

    private boolean isParallelMode() {
        return this.baseFirehoseFactory.isSplittable() && this.ingestionSchema.getTuningConfig().getMaxNumSubTasks() > 1;
    }

    @VisibleForTesting
    void setToolbox(TaskToolbox toolbox) {
        this.toolbox = toolbox;
    }

    private TaskStatus runParallel(TaskToolbox toolbox) throws Exception {
        this.createRunner(toolbox);
        return TaskStatus.fromCode((String)this.getId(), (TaskState)((ParallelIndexTaskRunner)Preconditions.checkNotNull((Object)this.runner, (Object)"runner")).run());
    }

    private TaskStatus runSequential(TaskToolbox toolbox) {
        return new IndexTask(this.getId(), this.getGroupId(), this.getTaskResource(), this.getDataSource(), new IndexTask.IndexIngestionSpec(this.getIngestionSchema().getDataSchema(), this.getIngestionSchema().getIOConfig(), ParallelIndexSupervisorTask.convertToIndexTuningConfig(this.getIngestionSchema().getTuningConfig())), this.getContext(), this.authorizerMapper, this.chatHandlerProvider, this.rowIngestionMetersFactory).run(toolbox);
    }

    private static IndexTask.IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningConfig tuningConfig) {
        return new IndexTask.IndexTuningConfig(null, tuningConfig.getMaxRowsPerSegment(), tuningConfig.getMaxRowsInMemory(), tuningConfig.getMaxBytesInMemory(), tuningConfig.getMaxTotalRows(), null, tuningConfig.getNumShards(), null, tuningConfig.getIndexSpec(), tuningConfig.getMaxPendingPersists(), true, tuningConfig.isForceExtendableShardSpecs(), false, tuningConfig.isReportParseExceptions(), null, tuningConfig.getPushTimeout(), tuningConfig.getSegmentWriteOutMediumFactory(), tuningConfig.isLogParseExceptions(), tuningConfig.getMaxParseExceptions(), tuningConfig.getMaxSavedParseExceptions());
    }

    @POST
    @Path(value="/segment/allocate")
    @Produces(value={"application/x-jackson-smile"})
    @Consumes(value={"application/x-jackson-smile"})
    public Response allocateSegment(DateTime timestamp, @Context HttpServletRequest req) {
        ChatHandlers.authorizationCheck((HttpServletRequest)req, (Action)Action.READ, (String)this.getDataSource(), (AuthorizerMapper)this.authorizerMapper);
        if (this.toolbox == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        try {
            SegmentIdWithShardSpec segmentIdentifier = this.allocateNewSegment(timestamp);
            return Response.ok((Object)this.toolbox.getObjectMapper().writeValueAsBytes((Object)segmentIdentifier)).build();
        }
        catch (IOException | IllegalStateException e) {
            return Response.serverError().entity((Object)Throwables.getStackTraceAsString((Throwable)e)).build();
        }
        catch (IllegalArgumentException e) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)Throwables.getStackTraceAsString((Throwable)e)).build();
        }
    }

    @VisibleForTesting
    SegmentIdWithShardSpec allocateNewSegment(DateTime timestamp) throws IOException {
        String version;
        Interval interval;
        String dataSource = this.getDataSource();
        GranularitySpec granularitySpec = this.getIngestionSchema().getDataSchema().getGranularitySpec();
        Optional bucketIntervals = granularitySpec.bucketIntervals();
        List<TaskLock> locks = this.toolbox.getTaskActionClient().submit(new LockListAction());
        TaskLock revokedLock = locks.stream().filter(TaskLock::isRevoked).findAny().orElse(null);
        if (revokedLock != null) {
            throw new ISE("Lock revoked: [%s]", new Object[]{revokedLock});
        }
        Map<Interval, String> versions = locks.stream().collect(Collectors.toMap(TaskLock::getInterval, TaskLock::getVersion));
        boolean justLockedInterval = false;
        if (bucketIntervals.isPresent()) {
            Optional maybeInterval = granularitySpec.bucketInterval(timestamp);
            if (!maybeInterval.isPresent()) {
                throw new IAE("Could not find interval for timestamp [%s]", new Object[]{timestamp});
            }
            interval = (Interval)maybeInterval.get();
            if (!((SortedSet)bucketIntervals.get()).contains(interval)) {
                throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", new Object[]{interval, granularitySpec});
            }
            version = ParallelIndexSupervisorTask.findVersion(versions, interval);
            if (version == null) {
                throw new ISE("Cannot find a version for interval[%s]", new Object[]{interval});
            }
        } else {
            interval = granularitySpec.getSegmentGranularity().bucket(timestamp);
            version = ParallelIndexSupervisorTask.findVersion(versions, interval);
            if (version == null) {
                TaskLock lock = (TaskLock)Preconditions.checkNotNull((Object)this.toolbox.getTaskActionClient().submit(new LockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)), (String)"Cannot acquire a lock for interval[%s]", (Object[])new Object[]{interval});
                version = lock.getVersion();
                justLockedInterval = true;
            }
        }
        int partitionNum = Counters.getAndIncrementInt(this.partitionNumCountersPerInterval, interval);
        if (justLockedInterval && partitionNum != 0) {
            throw new ISE("Expected partitionNum to be 0 for interval [%s] right after locking, but got [%s]", new Object[]{interval, partitionNum});
        }
        return new SegmentIdWithShardSpec(dataSource, interval, version, (ShardSpec)new NumberedShardSpec(partitionNum, 0));
    }

    @Nullable
    private static String findVersion(Map<Interval, String> versions, Interval interval) {
        return versions.entrySet().stream().filter(entry -> ((Interval)entry.getKey()).contains((ReadableInterval)interval)).map(Map.Entry::getValue).findFirst().orElse(null);
    }

    @POST
    @Path(value="/report")
    @Consumes(value={"application/x-jackson-smile"})
    public Response report(PushedSegmentsReport report, @Context HttpServletRequest req) {
        ChatHandlers.authorizationCheck((HttpServletRequest)req, (Action)Action.WRITE, (String)this.getDataSource(), (AuthorizerMapper)this.authorizerMapper);
        if (this.runner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        this.runner.collectReport(report);
        return Response.ok().build();
    }

    @GET
    @Path(value="/mode")
    @Produces(value={"application/json"})
    public Response getMode(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        return Response.ok((Object)(this.isParallelMode() ? "parallel" : "sequential")).build();
    }

    @GET
    @Path(value="/progress")
    @Produces(value={"application/json"})
    public Response getProgress(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        if (this.runner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        return Response.ok((Object)this.runner.getProgress()).build();
    }

    @GET
    @Path(value="/subtasks/running")
    @Produces(value={"application/json"})
    public Response getRunningTasks(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        if (this.runner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        return Response.ok(this.runner.getRunningTaskIds()).build();
    }

    @GET
    @Path(value="/subtaskspecs")
    @Produces(value={"application/json"})
    public Response getSubTaskSpecs(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        if (this.runner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        return Response.ok(this.runner.getSubTaskSpecs()).build();
    }

    @GET
    @Path(value="/subtaskspecs/running")
    @Produces(value={"application/json"})
    public Response getRunningSubTaskSpecs(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        if (this.runner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        return Response.ok(this.runner.getRunningSubTaskSpecs()).build();
    }

    @GET
    @Path(value="/subtaskspecs/complete")
    @Produces(value={"application/json"})
    public Response getCompleteSubTaskSpecs(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        if (this.runner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        return Response.ok(this.runner.getCompleteSubTaskSpecs()).build();
    }

    @GET
    @Path(value="/subtaskspec/{id}")
    @Produces(value={"application/json"})
    public Response getSubTaskSpec(@PathParam(value="id") String id, @Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        if (this.runner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        SubTaskSpec subTaskSpec = this.runner.getSubTaskSpec(id);
        if (subTaskSpec == null) {
            return Response.status((Response.Status)Response.Status.NOT_FOUND).build();
        }
        return Response.ok(subTaskSpec).build();
    }

    @GET
    @Path(value="/subtaskspec/{id}/state")
    @Produces(value={"application/json"})
    public Response getSubTaskState(@PathParam(value="id") String id, @Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        if (this.runner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        ParallelIndexTaskRunner.SubTaskSpecStatus subTaskSpecStatus = this.runner.getSubTaskState(id);
        if (subTaskSpecStatus == null) {
            return Response.status((Response.Status)Response.Status.NOT_FOUND).build();
        }
        return Response.ok((Object)subTaskSpecStatus).build();
    }

    @GET
    @Path(value="/subtaskspec/{id}/history")
    @Produces(value={"application/json"})
    public Response getCompleteSubTaskSpecAttemptHistory(@PathParam(value="id") String id, @Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        if (this.runner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        TaskHistory taskHistory = this.runner.getCompleteSubTaskSpecAttemptHistory(id);
        if (taskHistory == null) {
            return Response.status((Response.Status)Response.Status.NOT_FOUND).build();
        }
        return Response.ok(taskHistory.getAttemptHistory()).build();
    }
}

