package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.guice.annotations.EscalatedClient;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.SurrogateAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.utils.CompressionUtils;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.class */
public class PartialSegmentMergeTask extends AbstractBatchIndexTask {
    public static final String TYPE = "partial_index_merge";
    private static final Logger LOG = new Logger(PartialSegmentMergeTask.class);
    private static final int BUFFER_SIZE = 4096;
    private static final int NUM_FETCH_RETRIES = 3;
    private final byte[] buffer;
    private final int numAttempts;
    private final PartialSegmentMergeIngestionSpec ingestionSchema;
    private final String supervisorTaskId;
    private final IndexingServiceClient indexingServiceClient;
    private final IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> taskClientFactory;
    private final HttpClient shuffleClient;

    @JsonCreator
    public PartialSegmentMergeTask(@JsonProperty("id") @Nullable String str, @JsonProperty("groupId") String str2, @JsonProperty("resource") TaskResource taskResource, @JsonProperty("supervisorTaskId") String str3, @JsonProperty("numAttempts") int i, @JsonProperty("spec") PartialSegmentMergeIngestionSpec partialSegmentMergeIngestionSpec, @JsonProperty("context") Map<String, Object> map, @JacksonInject IndexingServiceClient indexingServiceClient, @JacksonInject IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> indexTaskClientFactory, @JacksonInject @EscalatedClient HttpClient httpClient) {
        super(getOrMakeId(str, TYPE, partialSegmentMergeIngestionSpec.getDataSchema().getDataSource()), str2, taskResource, partialSegmentMergeIngestionSpec.getDataSchema().getDataSource(), map);
        this.buffer = new byte[BUFFER_SIZE];
        Preconditions.checkArgument(((ParallelIndexTuningConfig) partialSegmentMergeIngestionSpec.getTuningConfig()).isForceGuaranteedRollup(), "forceGuaranteedRollup must be set");
        Preconditions.checkArgument(((ParallelIndexTuningConfig) partialSegmentMergeIngestionSpec.getTuningConfig()).getPartitionsSpec() == null || (((ParallelIndexTuningConfig) partialSegmentMergeIngestionSpec.getTuningConfig()).getPartitionsSpec() instanceof HashedPartitionsSpec), "Please use hashed_partitions for perfect rollup");
        Preconditions.checkArgument(!partialSegmentMergeIngestionSpec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty(), "Missing intervals in granularitySpec");
        this.numAttempts = i;
        this.ingestionSchema = partialSegmentMergeIngestionSpec;
        this.supervisorTaskId = str3;
        this.indexingServiceClient = indexingServiceClient;
        this.taskClientFactory = indexTaskClientFactory;
        this.shuffleClient = httpClient;
    }

    @JsonProperty
    public int getNumAttempts() {
        return this.numAttempts;
    }

    @JsonProperty("spec")
    public PartialSegmentMergeIngestionSpec getIngestionSchema() {
        return this.ingestionSchema;
    }

    @JsonProperty
    public String getSupervisorTaskId() {
        return this.supervisorTaskId;
    }

    @Override // org.apache.druid.indexing.common.task.AbstractBatchIndexTask
    public boolean requireLockExistingSegments() {
        return true;
    }

    @Override // org.apache.druid.indexing.common.task.AbstractBatchIndexTask
    public List<DataSegment> findSegmentsToLock(TaskActionClient taskActionClient, List<Interval> list) {
        throw new UnsupportedOperationException("This method should be never called because PartialSegmentMergeTask always uses timeChunk locking but this method is supposed to be called only with segment locking.");
    }

    @Override // org.apache.druid.indexing.common.task.AbstractBatchIndexTask
    public boolean isPerfectRollup() {
        return true;
    }

    @Override // org.apache.druid.indexing.common.task.AbstractBatchIndexTask
    @Nullable
    public Granularity getSegmentGranularity() {
        GranularitySpec granularitySpec = this.ingestionSchema.getDataSchema().getGranularitySpec();
        if (granularitySpec instanceof ArbitraryGranularitySpec) {
            return null;
        }
        return granularitySpec.getSegmentGranularity();
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public String getType() {
        return TYPE;
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public boolean isReady(TaskActionClient taskActionClient) {
        return true;
    }

    @Override // org.apache.druid.indexing.common.task.AbstractBatchIndexTask
    public TaskStatus runTask(TaskToolbox taskToolbox) throws Exception {
        HashMap hashMap = new HashMap();
        for (PartitionLocation partitionLocation : ((PartialSegmentMergeIOConfig) this.ingestionSchema.getIOConfig()).getPartitionLocations()) {
            ((List) hashMap.computeIfAbsent(partitionLocation.getInterval(), interval -> {
                return new Int2ObjectOpenHashMap();
            }).computeIfAbsent(partitionLocation.getPartitionId(), i -> {
                return new ArrayList();
            })).add(partitionLocation);
        }
        List list = (List) taskToolbox.getTaskActionClient().submit(new SurrogateAction(this.supervisorTaskId, new LockListAction()));
        HashMap hashMap2 = new HashMap(list.size());
        list.forEach(taskLock -> {
            if (taskLock.isRevoked()) {
                throw new ISE("Lock[%s] is revoked", new Object[]{taskLock});
            }
            String str = (String) hashMap2.put(taskLock.getInterval(), taskLock.getVersion());
            if (str != null) {
                throw new ISE("WTH? Two versions([%s], [%s]) for the same interval[%s]?", new Object[]{taskLock.getVersion(), str, taskLock.getInterval()});
            }
        });
        Stopwatch createStarted = Stopwatch.createStarted();
        Map<Interval, Int2ObjectMap<List<File>>> fetchSegmentFiles = fetchSegmentFiles(taskToolbox, hashMap);
        long elapsed = createStarted.elapsed(TimeUnit.SECONDS);
        createStarted.stop();
        LOG.info("Fetch took [%s] seconds", new Object[]{Long.valueOf(elapsed)});
        ParallelIndexSupervisorTaskClient build = this.taskClientFactory.build(new ClientBasedTaskInfoProvider(this.indexingServiceClient), getId(), 1, ((ParallelIndexTuningConfig) this.ingestionSchema.getTuningConfig()).getChatHandlerTimeout(), ((ParallelIndexTuningConfig) this.ingestionSchema.getTuningConfig()).getChatHandlerNumRetries());
        HashedPartitionsSpec hashedPartitionsSpec = (HashedPartitionsSpec) ((ParallelIndexTuningConfig) this.ingestionSchema.getTuningConfig()).getGivenOrDefaultPartitionsSpec();
        File persistDir = taskToolbox.getPersistDir();
        FileUtils.deleteQuietly(persistDir);
        FileUtils.forceMkdir(persistDir);
        build.report(this.supervisorTaskId, new PushedSegmentsReport(getId(), Collections.emptySet(), mergeAndPushSegments(taskToolbox, hashedPartitionsSpec, persistDir, hashMap2, fetchSegmentFiles)));
        return TaskStatus.success(getId());
    }

    private Map<Interval, Int2ObjectMap<List<File>>> fetchSegmentFiles(TaskToolbox taskToolbox, Map<Interval, Int2ObjectMap<List<PartitionLocation>>> map) throws IOException {
        File firehoseTemporaryDir = taskToolbox.getFirehoseTemporaryDir();
        FileUtils.deleteQuietly(firehoseTemporaryDir);
        FileUtils.forceMkdir(firehoseTemporaryDir);
        HashMap hashMap = new HashMap();
        for (Map.Entry<Interval, Int2ObjectMap<List<PartitionLocation>>> entry : map.entrySet()) {
            Interval key = entry.getKey();
            ObjectIterator it = entry.getValue().int2ObjectEntrySet().iterator();
            while (it.hasNext()) {
                Int2ObjectMap.Entry entry2 = (Int2ObjectMap.Entry) it.next();
                int intKey = entry2.getIntKey();
                File file = FileUtils.getFile(firehoseTemporaryDir, new String[]{key.getStart().toString(), key.getEnd().toString(), Integer.toString(intKey)});
                FileUtils.forceMkdir(file);
                for (PartitionLocation partitionLocation : (List) entry2.getValue()) {
                    File fetchSegmentFile = fetchSegmentFile(file, partitionLocation);
                    try {
                        File file2 = new File(file, StringUtils.format("unzipped_%s", new Object[]{partitionLocation.getSubTaskId()}));
                        FileUtils.forceMkdir(file2);
                        CompressionUtils.unzip(fetchSegmentFile, file2);
                        ((List) ((Int2ObjectMap) hashMap.computeIfAbsent(key, interval -> {
                            return new Int2ObjectOpenHashMap();
                        })).computeIfAbsent(intKey, i -> {
                            return new ArrayList();
                        })).add(file2);
                        if (!fetchSegmentFile.delete()) {
                            LOG.warn("Failed to delete temp file[%s]", new Object[]{fetchSegmentFile});
                        }
                    } catch (Throwable th) {
                        if (!fetchSegmentFile.delete()) {
                            LOG.warn("Failed to delete temp file[%s]", new Object[]{fetchSegmentFile});
                        }
                        throw th;
                    }
                }
            }
        }
        return hashMap;
    }

    @VisibleForTesting
    File fetchSegmentFile(File file, PartitionLocation partitionLocation) throws IOException {
        File file2 = new File(file, StringUtils.format("temp_%s", new Object[]{partitionLocation.getSubTaskId()}));
        URI intermediaryDataServerURI = partitionLocation.toIntermediaryDataServerURI(this.supervisorTaskId);
        org.apache.druid.java.util.common.FileUtils.copyLarge(intermediaryDataServerURI, uri -> {
            try {
                return (InputStream) this.shuffleClient.go(new Request(HttpMethod.GET, uri.toURL()), new BytesAccumulatingResponseHandler()).get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }, file2, this.buffer, th -> {
            return th instanceof IOException;
        }, NUM_FETCH_RETRIES, StringUtils.format("Failed to fetch file[%s]", new Object[]{intermediaryDataServerURI}));
        return file2;
    }

    private Set<DataSegment> mergeAndPushSegments(TaskToolbox taskToolbox, HashedPartitionsSpec hashedPartitionsSpec, File file, Map<Interval, String> map, Map<Interval, Int2ObjectMap<List<File>>> map2) throws Exception {
        DataSegmentPusher segmentPusher = taskToolbox.getSegmentPusher();
        HashSet hashSet = new HashSet();
        for (Map.Entry<Interval, Int2ObjectMap<List<File>>> entry : map2.entrySet()) {
            Interval key = entry.getKey();
            ObjectIterator it = entry.getValue().int2ObjectEntrySet().iterator();
            while (it.hasNext()) {
                Int2ObjectMap.Entry entry2 = (Int2ObjectMap.Entry) it.next();
                int intKey = entry2.getIntKey();
                Pair<File, List<String>> mergeSegmentsInSamePartition = mergeSegmentsInSamePartition(this.ingestionSchema, taskToolbox.getIndexIO(), taskToolbox.getIndexMergerV9(), (List) entry2.getValue(), ((ParallelIndexTuningConfig) this.ingestionSchema.getTuningConfig()).getMaxNumSegmentsToMerge(), file, 0);
                List list = (List) Arrays.stream(this.ingestionSchema.getDataSchema().getAggregators()).map((v0) -> {
                    return v0.getName();
                }).collect(Collectors.toList());
                hashSet.add((DataSegment) RetryUtils.retry(() -> {
                    return segmentPusher.push((File) mergeSegmentsInSamePartition.lhs, new DataSegment(getDataSource(), key, (String) Preconditions.checkNotNull(map.get(key), "version for interval[%s]", new Object[]{key}), (Map) null, (List) mergeSegmentsInSamePartition.rhs, list, new HashBasedNumberedShardSpec(intKey, ((Integer) Preconditions.checkNotNull(hashedPartitionsSpec.getNumShards(), "numShards")).intValue(), hashedPartitionsSpec.getPartitionDimensions(), taskToolbox.getObjectMapper()), (Integer) null, 0L), false);
                }, th -> {
                    return th instanceof Exception;
                }, 5));
            }
        }
        return hashSet;
    }

    private static Pair<File, List<String>> mergeSegmentsInSamePartition(PartialSegmentMergeIngestionSpec partialSegmentMergeIngestionSpec, IndexIO indexIO, IndexMergerV9 indexMergerV9, List<File> list, int i, File file, int i2) throws IOException {
        int i3 = i2;
        ArrayList arrayList = new ArrayList();
        List list2 = null;
        int i4 = 0;
        while (true) {
            int i5 = i4;
            if (i5 >= list.size()) {
                break;
            }
            List<File> subList = list.subList(i5, Math.min(i5 + i, list.size()));
            ArrayList arrayList2 = new ArrayList(subList.size());
            Closer create = Closer.create();
            for (File file2 : subList) {
                QueryableIndex loadIndex = indexIO.loadIndex(file2);
                arrayList2.add(loadIndex);
                create.register(() -> {
                    loadIndex.close();
                    file2.delete();
                });
            }
            if (i >= list.size()) {
                list2 = IndexMerger.getMergedDimensionsFromQueryableIndexes(arrayList2);
            }
            int i6 = i3;
            i3++;
            arrayList.add(indexMergerV9.mergeQueryableIndex(arrayList2, partialSegmentMergeIngestionSpec.getDataSchema().getGranularitySpec().isRollup(), partialSegmentMergeIngestionSpec.getDataSchema().getAggregators(), new File(file, StringUtils.format("merged_%d", new Object[]{Integer.valueOf(i6)})), ((ParallelIndexTuningConfig) partialSegmentMergeIngestionSpec.getTuningConfig()).getIndexSpec(), ((ParallelIndexTuningConfig) partialSegmentMergeIngestionSpec.getTuningConfig()).getSegmentWriteOutMediumFactory()));
            create.close();
            i4 = i5 + i;
        }
        return arrayList.size() == 1 ? Pair.of(arrayList.get(0), Preconditions.checkNotNull(list2, "dimensionNames")) : mergeSegmentsInSamePartition(partialSegmentMergeIngestionSpec, indexIO, indexMergerV9, arrayList, i, file, i3);
    }
}
