package org.apache.druid.indexing.worker.shuffle;

import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import com.google.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.TaskStatus;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.batch.parallel.GenericPartitionStat;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.loading.StorageLocation;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.apache.druid.utils.CompressionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;

@ManageLifecycle
/* loaded from: input_file:org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.class */
public class LocalIntermediaryDataManager implements IntermediaryDataManager {
    private static final Logger LOG = new Logger(LocalIntermediaryDataManager.class);
    private final long intermediaryPartitionDiscoveryPeriodSec;
    private final long intermediaryPartitionCleanupPeriodSec;
    private final Period intermediaryPartitionTimeout;
    private final TaskConfig taskConfig;
    private final List<StorageLocation> shuffleDataLocations;
    private final IndexingServiceClient indexingServiceClient;
    private final ConcurrentHashMap<String, DateTime> supervisorTaskCheckTimes = new ConcurrentHashMap<>();
    private final Map<String, Iterator<StorageLocation>> locationIterators = new HashMap();
    private ScheduledExecutorService supervisorTaskChecker;

    @Inject
    public LocalIntermediaryDataManager(WorkerConfig workerConfig, TaskConfig taskConfig, IndexingServiceClient indexingServiceClient) {
        this.intermediaryPartitionDiscoveryPeriodSec = workerConfig.getIntermediaryPartitionDiscoveryPeriodSec();
        this.intermediaryPartitionCleanupPeriodSec = workerConfig.getIntermediaryPartitionCleanupPeriodSec();
        this.intermediaryPartitionTimeout = workerConfig.getIntermediaryPartitionTimeout();
        this.taskConfig = taskConfig;
        this.shuffleDataLocations = (List) taskConfig.getShuffleDataLocations().stream().map(storageLocationConfig -> {
            return new StorageLocation(storageLocationConfig.getPath(), storageLocationConfig.getMaxSize(), storageLocationConfig.getFreeSpacePercent());
        }).collect(Collectors.toList());
        this.indexingServiceClient = indexingServiceClient;
    }

    @Override // org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager
    @LifecycleStart
    public void start() {
        discoverSupervisorTaskPartitions();
        this.supervisorTaskChecker = Execs.scheduledSingleThreaded("intermediary-data-manager-%d");
        this.supervisorTaskChecker.scheduleAtFixedRate(() -> {
            try {
                discoverSupervisorTaskPartitions();
            } catch (Exception e) {
                LOG.warn(e, "Error while discovering supervisorTasks", new Object[0]);
            }
        }, this.intermediaryPartitionDiscoveryPeriodSec, this.intermediaryPartitionDiscoveryPeriodSec, TimeUnit.SECONDS);
        this.supervisorTaskChecker.scheduleAtFixedRate(() -> {
            try {
                deleteExpiredSuprevisorTaskPartitionsIfNotRunning();
            } catch (InterruptedException e) {
                LOG.error(e, "Error while cleaning up partitions for expired supervisors", new Object[0]);
            } catch (Exception e2) {
                LOG.warn(e2, "Error while cleaning up partitions for expired supervisors", new Object[0]);
            }
        }, this.intermediaryPartitionCleanupPeriodSec, this.intermediaryPartitionCleanupPeriodSec, TimeUnit.SECONDS);
    }

    @Override // org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager
    @LifecycleStop
    public void stop() {
        if (this.supervisorTaskChecker != null) {
            this.supervisorTaskChecker.shutdownNow();
            try {
                this.supervisorTaskChecker.awaitTermination(10L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Throwables.propagate(e);
            }
        }
        this.supervisorTaskCheckTimes.clear();
    }

    private void discoverSupervisorTaskPartitions() {
        for (StorageLocation storageLocation : this.shuffleDataLocations) {
            Path absolutePath = storageLocation.getPath().toPath().toAbsolutePath();
            MutableInt mutableInt = new MutableInt(0);
            File[] listFiles = storageLocation.getPath().listFiles();
            if (listFiles != null) {
                for (File file : listFiles) {
                    this.supervisorTaskCheckTimes.computeIfAbsent(file.getName(), str -> {
                        for (File file2 : FileUtils.listFiles(file, (String[]) null, true)) {
                            if (storageLocation.reserve(absolutePath.relativize(file2.toPath().toAbsolutePath()).toString(), file2.getName(), file2.length()) == null) {
                                LOG.warn("Can't add a discovered partition[%s]", new Object[]{file2.getAbsolutePath()});
                            }
                        }
                        mutableInt.increment();
                        return getExpiryTimeFromNow();
                    });
                }
            }
            if (mutableInt.getValue().intValue() > 0) {
                LOG.info("Discovered partitions for [%s] new supervisor tasks under location[%s]", new Object[]{mutableInt.getValue(), storageLocation.getPath()});
            }
        }
    }

    private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws InterruptedException {
        DateTime nowUtc = DateTimes.nowUtc();
        HashSet hashSet = new HashSet();
        for (Map.Entry<String, DateTime> entry : this.supervisorTaskCheckTimes.entrySet()) {
            String key = entry.getKey();
            if (entry.getValue().isAfter(nowUtc)) {
                hashSet.add(key);
            }
        }
        if (!hashSet.isEmpty()) {
            LOG.info("Found [%s] expired supervisor tasks", new Object[]{Integer.valueOf(hashSet.size())});
        }
        if (hashSet.isEmpty()) {
            return;
        }
        for (Map.Entry entry2 : this.indexingServiceClient.getTaskStatuses(hashSet).entrySet()) {
            String str = (String) entry2.getKey();
            if (((TaskStatus) entry2.getValue()).getStatusCode().isComplete()) {
                try {
                    deletePartitions(str);
                } catch (IOException e) {
                    LOG.warn(e, "Failed to delete partitions for task[%s]", new Object[]{str});
                }
            } else {
                this.supervisorTaskCheckTimes.put(str, getExpiryTimeFromNow());
            }
        }
    }

    @Override // org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager
    public DataSegment addSegment(String str, String str2, DataSegment dataSegment, File file) throws IOException {
        Iterator<StorageLocation> computeIfAbsent = this.locationIterators.computeIfAbsent(str, str3 -> {
            Iterator cycle = Iterators.cycle(this.shuffleDataLocations);
            IntStream.range(0, ThreadLocalRandom.current().nextInt(this.shuffleDataLocations.size())).forEach(i -> {
            });
            return cycle;
        });
        File taskTempDir = this.taskConfig.getTaskTempDir(str2);
        Closer create = Closer.create();
        create.register(() -> {
            try {
                FileUtils.forceDelete(taskTempDir);
            } catch (IOException e) {
                LOG.warn(e, "Failed to delete directory[%s]", new Object[]{taskTempDir.getAbsolutePath()});
            }
        });
        if (!(dataSegment.getShardSpec() instanceof BucketNumberedShardSpec)) {
            throw new IAE("Invalid shardSpec type. Expected [%s] but got [%s]", new Object[]{BucketNumberedShardSpec.class.getName(), dataSegment.getShardSpec().getClass().getName()});
        }
        BucketNumberedShardSpec shardSpec = dataSegment.getShardSpec();
        Throwable th = null;
        try {
            FileUtils.forceMkdir(taskTempDir);
            File file2 = new File(taskTempDir, dataSegment.getId().toString());
            long zip = CompressionUtils.zip(file, file2);
            if (zip == 0) {
                throw new IOE("Read 0 bytes from segmentDir[%s]", new Object[]{file.getAbsolutePath()});
            }
            for (int i = 0; i < this.shuffleDataLocations.size(); i++) {
                StorageLocation next = computeIfAbsent.next();
                String partitionFilePath = getPartitionFilePath(str, str2, dataSegment.getInterval(), shardSpec.getBucketId());
                File reserve = next.reserve(partitionFilePath, dataSegment.getId().toString(), file2.length());
                if (reserve != null) {
                    try {
                        FileUtils.forceMkdirParent(reserve);
                        org.apache.druid.java.util.common.FileUtils.writeAtomically(reserve, outputStream -> {
                            return Long.valueOf(Files.asByteSource(file2).copyTo(outputStream));
                        });
                        LOG.info("Wrote intermediary segment[%s] for subtask[%s] at [%s]", new Object[]{dataSegment.getId(), str2, reserve});
                        return dataSegment.withSize(zip).withBinaryVersion(SegmentUtils.getVersionFromDir(file));
                    } catch (Exception e) {
                        next.release(partitionFilePath, file2.length());
                        FileUtils.deleteQuietly(reserve);
                        LOG.warn(e, "Failed to write segment[%s] at [%s]. Trying again with the next location", new Object[]{dataSegment.getId(), reserve});
                    }
                }
            }
            throw new ISE("Can't find location to handle segment[%s]", new Object[]{dataSegment});
        } finally {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    create.close();
                }
            }
        }
    }

    @Override // org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager
    public Optional<ByteSource> findPartitionFile(String str, String str2, Interval interval, int i) {
        IdUtils.validateId("supervisorTaskId", str);
        Iterator<StorageLocation> it = this.shuffleDataLocations.iterator();
        while (it.hasNext()) {
            File file = new File(it.next().getPath(), getPartitionDirPath(str, interval, i));
            if (file.exists()) {
                this.supervisorTaskCheckTimes.put(str, getExpiryTimeFromNow());
                File[] listFiles = file.listFiles();
                if (listFiles == null) {
                    return Optional.empty();
                }
                for (File file2 : listFiles) {
                    if (file2.getName().equals(str2)) {
                        return Optional.of(Files.asByteSource(file2));
                    }
                }
                return Optional.empty();
            }
        }
        return Optional.empty();
    }

    @Override // org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager
    public GenericPartitionStat generatePartitionStat(TaskToolbox taskToolbox, DataSegment dataSegment) {
        return new GenericPartitionStat(taskToolbox.getTaskExecutorNode().getHost(), taskToolbox.getTaskExecutorNode().getPortToUse(), taskToolbox.getTaskExecutorNode().isEnableTlsPort(), dataSegment.getInterval(), dataSegment.getShardSpec(), null, null);
    }

    private DateTime getExpiryTimeFromNow() {
        return DateTimes.nowUtc().plus(this.intermediaryPartitionTimeout);
    }

    @Override // org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager
    public void deletePartitions(String str) throws IOException {
        IdUtils.validateId("supervisorTaskId", str);
        for (StorageLocation storageLocation : this.shuffleDataLocations) {
            File file = new File(storageLocation.getPath(), str);
            if (file.exists()) {
                LOG.info("Cleaning up [%s]", new Object[]{file});
                Iterator it = FileUtils.listFiles(file, (String[]) null, true).iterator();
                while (it.hasNext()) {
                    storageLocation.removeFile((File) it.next());
                }
                FileUtils.forceDelete(file);
            }
        }
        this.supervisorTaskCheckTimes.remove(str);
    }
}
