package org.apache.druid.indexing.common.task;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.KillTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
import org.apache.druid.indexing.common.actions.RetrieveUpgradedFromSegmentIdsAction;
import org.apache.druid.indexing.common.actions.RetrieveUpgradedToSegmentIdsAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentNukeAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.actions.UpgradedFromSegmentsResponse;
import org.apache.druid.indexing.common.actions.UpgradedToSegmentsResponse;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.class */
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask {
    public static final String TYPE = "kill";
    private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);
    private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100;

    @Nullable
    private final List<String> versions;
    private final int batchSize;

    @Nullable
    private final Integer limit;

    @Nullable
    private final DateTime maxUsedStatusLastUpdatedTime;

    @JsonCreator
    public KillUnusedSegmentsTask(@JsonProperty("id") String str, @JsonProperty("dataSource") String str2, @JsonProperty("interval") Interval interval, @JsonProperty("versions") @Nullable List<String> list, @JsonProperty("context") Map<String, Object> map, @JsonProperty("batchSize") Integer num, @JsonProperty("limit") @Nullable Integer num2, @JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime dateTime) {
        super(getOrMakeId(str, TYPE, str2, interval), str2, interval, map);
        this.batchSize = num != null ? num.intValue() : DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
        if (this.batchSize <= 0) {
            throw InvalidInput.exception("batchSize[%d] must be a positive integer.", new Object[]{num});
        }
        if (num2 != null && num2.intValue() <= 0) {
            throw InvalidInput.exception("limit[%d] must be a positive integer.", new Object[]{num2});
        }
        this.versions = list;
        this.limit = num2;
        this.maxUsedStatusLastUpdatedTime = dateTime;
    }

    @JsonProperty
    @Nullable
    public List<String> getVersions() {
        return this.versions;
    }

    @JsonProperty
    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
    public int getBatchSize() {
        return this.batchSize;
    }

    @JsonProperty
    @Nullable
    public Integer getLimit() {
        return this.limit;
    }

    @JsonProperty
    @Nullable
    public DateTime getMaxUsedStatusLastUpdatedTime() {
        return this.maxUsedStatusLastUpdatedTime;
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public String getType() {
        return TYPE;
    }

    @Override // org.apache.druid.indexing.common.task.Task
    @Nonnull
    @JsonIgnore
    public Set<ResourceAction> getInputSourceResources() {
        return ImmutableSet.of();
    }

    @Override // org.apache.druid.indexing.common.task.AbstractTask
    public TaskStatus runTask(TaskToolbox taskToolbox) throws Exception {
        int i = 0;
        int i2 = 0;
        int computeNextBatchSize = computeNextBatchSize(0);
        Integer numTotalBatches = getNumTotalBatches();
        Logger logger = LOG;
        Object[] objArr = new Object[7];
        objArr[0] = getDataSource();
        objArr[1] = getInterval();
        objArr[2] = getVersions();
        objArr[3] = Integer.valueOf(this.batchSize);
        objArr[4] = this.limit;
        objArr[5] = this.maxUsedStatusLastUpdatedTime;
        objArr[6] = numTotalBatches != null ? StringUtils.format(" in [%d] batches.", new Object[]{numTotalBatches}) : ".";
        logger.info("Starting kill for datasource[%s] in interval[%s] and versions[%s] with batchSize[%d], up to limit[%d] segments before maxUsedStatusLastUpdatedTime[%s] will be deleted%s", objArr);
        TaskActionClient taskActionClient = taskToolbox.getTaskActionClient();
        Set<Map<String, Object>> set = (Set) ((Collection) taskActionClient.submit(new RetrieveUsedSegmentsAction(getDataSource(), ImmutableList.of(getInterval()), Segments.INCLUDING_OVERSHADOWED))).stream().map((v0) -> {
            return v0.getLoadSpec();
        }).collect(Collectors.toSet());
        while (computeNextBatchSize > 0) {
            List<DataSegment> list = (List) taskToolbox.getTaskActionClient().submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), getVersions(), Integer.valueOf(computeNextBatchSize), this.maxUsedStatusLastUpdatedTime));
            NavigableMap<DateTime, List<TaskLock>> nonRevokedTaskLockMap = getNonRevokedTaskLockMap(taskToolbox.getTaskActionClient());
            if (!TaskLocks.isLockCoversSegments(nonRevokedTaskLockMap, list)) {
                throw new ISE("Locks[%s] for task[%s] can't cover segments[%s]", new Object[]{nonRevokedTaskLockMap.values().stream().flatMap((v0) -> {
                    return v0.stream();
                }).collect(Collectors.toList()), getId(), list});
            }
            Set set2 = (Set) list.stream().map((v0) -> {
                return v0.getId();
            }).map((v0) -> {
                return v0.toString();
            }).collect(Collectors.toSet());
            HashMap hashMap = new HashMap();
            try {
                hashMap.putAll(((UpgradedFromSegmentsResponse) taskActionClient.submit(new RetrieveUpgradedFromSegmentIdsAction(getDataSource(), set2))).getUpgradedFromSegmentIds());
            } catch (Exception e) {
                LOG.warn(e, "Could not retrieve parent segment ids using task action[retrieveUpgradedFromSegmentIds]. Overlord may be on an older version.", new Object[0]);
            }
            taskActionClient.submit(new SegmentNukeAction(new HashSet(list)));
            List<DataSegment> killableSegments = getKillableSegments(list, hashMap, set, taskActionClient);
            HashSet hashSet = new HashSet(list);
            Objects.requireNonNull(hashSet);
            killableSegments.forEach((v1) -> {
                r1.remove(v1);
            });
            LOG.infoSegments(hashSet, "Skipping segment kill from deep storage as their load specs are referenced by other segments.");
            taskToolbox.getDataSegmentKiller().kill(killableSegments);
            i2++;
            i += killableSegments.size();
            LOG.info("Processed [%d] batches for kill task[%s].", new Object[]{Integer.valueOf(i2), getId()});
            computeNextBatchSize = computeNextBatchSize(i);
            if (list.isEmpty() || (null != numTotalBatches && i2 >= numTotalBatches.intValue())) {
                break;
            }
        }
        String id = getId();
        LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. Deleted total [%d] unused segments in [%d] batches.", new Object[]{id, getDataSource(), getInterval(), Integer.valueOf(i), Integer.valueOf(i2)});
        taskToolbox.getTaskReportFileWriter().write(id, TaskReport.buildTaskReports(new TaskReport[]{new KillTaskReport(id, new KillTaskReport.Stats(i, i2))}));
        return TaskStatus.success(id);
    }

    @VisibleForTesting
    @JsonIgnore
    @Nullable
    Integer getNumTotalBatches() {
        if (null != this.limit) {
            return Integer.valueOf((int) Math.ceil(this.limit.intValue() / this.batchSize));
        }
        return null;
    }

    @VisibleForTesting
    @JsonIgnore
    int computeNextBatchSize(int i) {
        return null != this.limit ? Math.min(this.limit.intValue() - i, this.batchSize) : this.batchSize;
    }

    private NavigableMap<DateTime, List<TaskLock>> getNonRevokedTaskLockMap(TaskActionClient taskActionClient) throws IOException {
        TreeMap treeMap = new TreeMap();
        getTaskLocks(taskActionClient).forEach(taskLock -> {
            if (taskLock.isRevoked()) {
                return;
            }
            ((List) treeMap.computeIfAbsent(taskLock.getInterval().getStart(), dateTime -> {
                return new ArrayList();
            })).add(taskLock);
        });
        return treeMap;
    }

    private List<DataSegment> getKillableSegments(List<DataSegment> list, Map<String, String> map, Set<Map<String, Object>> set, TaskActionClient taskActionClient) {
        HashMap hashMap = new HashMap();
        for (DataSegment dataSegment : list) {
            String segmentId = dataSegment.getId().toString();
            ((Set) hashMap.computeIfAbsent(map.getOrDefault(segmentId, segmentId), str -> {
                return new HashSet();
            })).add(dataSegment);
        }
        try {
            UpgradedToSegmentsResponse upgradedToSegmentsResponse = (UpgradedToSegmentsResponse) taskActionClient.submit(new RetrieveUpgradedToSegmentIdsAction(getDataSource(), hashMap.keySet()));
            if (upgradedToSegmentsResponse != null && upgradedToSegmentsResponse.getUpgradedToSegmentIds() != null) {
                upgradedToSegmentsResponse.getUpgradedToSegmentIds().forEach((str2, set2) -> {
                    if (CollectionUtils.isNullOrEmpty(set2)) {
                        return;
                    }
                    hashMap.remove(str2);
                });
            }
        } catch (Exception e) {
            LOG.warn(e, "Could not retrieve referenced ids using task action[retrieveUpgradedToSegmentIds]. Overlord may be on an older version.", new Object[0]);
        }
        return (List) hashMap.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).filter(dataSegment2 -> {
            return !set.contains(dataSegment2.getLoadSpec());
        }).collect(Collectors.toList());
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public LookupLoadingSpec getLookupLoadingSpec() {
        return LookupLoadingSpec.NONE;
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec() {
        return BroadcastDatasourceLoadingSpec.NONE;
    }

    @Override // org.apache.druid.indexing.common.task.AbstractFixedIntervalTask, org.apache.druid.indexing.common.task.Task
    public boolean isReady(TaskActionClient taskActionClient) throws Exception {
        TaskLock taskLock = (TaskLock) taskActionClient.submit(new TimeChunkLockTryAcquireAction(determineLockType(Boolean.TRUE.equals(getContextValue(Tasks.USE_CONCURRENT_LOCKS, false))), getInterval()));
        if (taskLock == null) {
            return false;
        }
        taskLock.assertNotRevoked();
        return true;
    }

    private TaskLockType determineLockType(boolean z) {
        return z ? TaskLockType.REPLACE : (TaskLockType) getContextValue(Tasks.TASK_LOCK_TYPE, TaskLockType.EXCLUSIVE);
    }
}
