package org.apache.druid.indexing.compact;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.compaction.CompactionRunSimulator;
import org.apache.druid.server.compaction.CompactionSimulateResult;
import org.apache.druid.server.compaction.CompactionStatusTracker;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.CompactionSupervisorConfig;
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.CoordinatorStat;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.Stats;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/druid/indexing/compact/OverlordCompactionScheduler.class */
public class OverlordCompactionScheduler implements CompactionScheduler {
    private static final Logger log = new Logger(OverlordCompactionScheduler.class);
    private static final long SCHEDULE_PERIOD_SECONDS = 5;
    private static final Duration METRIC_EMISSION_PERIOD = Duration.standardMinutes(SCHEDULE_PERIOD_SECONDS);
    private final SegmentsMetadataManager segmentManager;
    private final LocalOverlordClient overlordClient;
    private final ServiceEmitter emitter;
    private final TaskMaster taskMaster;
    private final CompactionSupervisorConfig supervisorConfig;
    private final Supplier<DruidCompactionConfig> compactionConfigSupplier;
    private final ConcurrentHashMap<String, DataSourceCompactionConfig> activeDatasourceConfigs;
    private final ScheduledExecutorService executor;
    private final CompactionStatusTracker statusTracker;
    private final TaskRunnerListener taskRunnerListener;
    private final CompactSegments duty;
    private final boolean shouldPollSegments;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final Stopwatch sinceStatsEmitted = Stopwatch.createUnstarted();

    @Inject
    public OverlordCompactionScheduler(TaskMaster taskMaster, TaskQueryTool taskQueryTool, SegmentsMetadataManager segmentsMetadataManager, Supplier<DruidCompactionConfig> supplier, final CompactionStatusTracker compactionStatusTracker, CompactionSupervisorConfig compactionSupervisorConfig, CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig, ScheduledExecutorFactory scheduledExecutorFactory, ServiceEmitter serviceEmitter, ObjectMapper objectMapper) {
        this.segmentManager = segmentsMetadataManager;
        this.emitter = serviceEmitter;
        this.taskMaster = taskMaster;
        this.supervisorConfig = compactionSupervisorConfig;
        this.compactionConfigSupplier = supplier;
        this.executor = scheduledExecutorFactory.create(1, "CompactionScheduler-%s");
        this.statusTracker = compactionStatusTracker;
        this.shouldPollSegments = (segmentsMetadataManager == null || coordinatorOverlordServiceConfig.isEnabled()) ? false : true;
        this.overlordClient = new LocalOverlordClient(taskMaster, taskQueryTool, objectMapper);
        this.duty = new CompactSegments(this.statusTracker, this.overlordClient);
        this.activeDatasourceConfigs = new ConcurrentHashMap<>();
        this.taskRunnerListener = new TaskRunnerListener() { // from class: org.apache.druid.indexing.compact.OverlordCompactionScheduler.1
            @Override // org.apache.druid.indexing.overlord.TaskRunnerListener
            public String getListenerId() {
                return "OverlordCompactionScheduler";
            }

            @Override // org.apache.druid.indexing.overlord.TaskRunnerListener
            public void locationChanged(String str, TaskLocation taskLocation) {
            }

            @Override // org.apache.druid.indexing.overlord.TaskRunnerListener
            public void statusChanged(String str, TaskStatus taskStatus) {
                if (taskStatus.isComplete()) {
                    compactionStatusTracker.onTaskFinished(str, taskStatus);
                }
            }
        };
    }

    @Override // org.apache.druid.indexing.compact.CompactionScheduler
    public void start() {
        if (isEnabled() && this.started.compareAndSet(false, true)) {
            log.info("Starting compaction scheduler.", new Object[0]);
            initState();
            scheduleOnExecutor(this::scheduledRun);
        }
    }

    @Override // org.apache.druid.indexing.compact.CompactionScheduler
    public void stop() {
        if (isEnabled() && this.started.compareAndSet(true, false)) {
            log.info("Stopping compaction scheduler.", new Object[0]);
            cleanupState();
        }
    }

    @Override // org.apache.druid.indexing.compact.CompactionScheduler
    public boolean isRunning() {
        return isEnabled() && this.started.get();
    }

    @Override // org.apache.druid.indexing.compact.CompactionScheduler
    public CompactionConfigValidationResult validateCompactionConfig(DataSourceCompactionConfig dataSourceCompactionConfig) {
        return dataSourceCompactionConfig == null ? CompactionConfigValidationResult.failure("Cannot be null", new Object[0]) : ClientCompactionRunnerInfo.validateCompactionConfig(dataSourceCompactionConfig, this.supervisorConfig.getEngine());
    }

    @Override // org.apache.druid.indexing.compact.CompactionScheduler
    public void startCompaction(String str, DataSourceCompactionConfig dataSourceCompactionConfig) {
        if (isEnabled()) {
            this.activeDatasourceConfigs.put(str, dataSourceCompactionConfig);
        }
    }

    @Override // org.apache.druid.indexing.compact.CompactionScheduler
    public void stopCompaction(String str) {
        this.activeDatasourceConfigs.remove(str);
        this.statusTracker.removeDatasource(str);
    }

    private synchronized void initState() {
        Optional<TaskRunner> taskRunner = this.taskMaster.getTaskRunner();
        if (taskRunner.isPresent()) {
            ((TaskRunner) taskRunner.get()).registerListener(this.taskRunnerListener, Execs.directExecutor());
        }
        if (this.shouldPollSegments) {
            this.segmentManager.startPollingDatabasePeriodically();
        }
    }

    private synchronized void cleanupState() {
        Optional<TaskRunner> taskRunner = this.taskMaster.getTaskRunner();
        if (taskRunner.isPresent()) {
            ((TaskRunner) taskRunner.get()).unregisterListener(this.taskRunnerListener.getListenerId());
        }
        this.statusTracker.stop();
        this.activeDatasourceConfigs.clear();
        if (this.shouldPollSegments) {
            this.segmentManager.stopPollingDatabasePeriodically();
        }
    }

    private boolean isEnabled() {
        return this.supervisorConfig.isEnabled();
    }

    private synchronized void scheduledRun() {
        if (!isRunning()) {
            cleanupState();
            return;
        }
        try {
            runCompactionDuty();
        } catch (Exception e) {
            log.error(e, "Error processing compaction queue. Continuing schedule.", new Object[0]);
        }
        scheduleOnExecutor(this::scheduledRun);
    }

    private synchronized void runCompactionDuty() {
        CoordinatorRunStats coordinatorRunStats = new CoordinatorRunStats();
        this.duty.run(getLatestConfig(), getDatasourceSnapshot(), this.supervisorConfig.getEngine(), coordinatorRunStats);
        if (!this.sinceStatsEmitted.isRunning() || this.sinceStatsEmitted.hasElapsed(METRIC_EMISSION_PERIOD)) {
            coordinatorRunStats.forEachStat((coordinatorStat, rowKey, j) -> {
                if (coordinatorStat.shouldEmit()) {
                    emitStat(coordinatorStat, rowKey.getValues(), j);
                }
            });
            this.sinceStatsEmitted.restart();
        } else {
            emitStat(Stats.Compaction.SUBMITTED_TASKS, Collections.emptyMap(), coordinatorRunStats.get(Stats.Compaction.SUBMITTED_TASKS));
        }
    }

    @Override // org.apache.druid.indexing.compact.CompactionScheduler
    public AutoCompactionSnapshot getCompactionSnapshot(String str) {
        return this.duty.getAutoCompactionSnapshot(str);
    }

    @Override // org.apache.druid.indexing.compact.CompactionScheduler
    public Map<String, AutoCompactionSnapshot> getAllCompactionSnapshots() {
        return this.duty.getAutoCompactionSnapshot();
    }

    @Override // org.apache.druid.indexing.compact.CompactionScheduler
    public CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionConfig clusterCompactionConfig) {
        return isRunning() ? new CompactionRunSimulator(this.statusTracker, this.overlordClient).simulateRunWithConfig(getLatestConfig().withClusterConfig(clusterCompactionConfig), getDatasourceSnapshot(), this.supervisorConfig.getEngine()) : new CompactionSimulateResult(Collections.emptyMap());
    }

    private void emitStat(CoordinatorStat coordinatorStat, Map<Dimension, String> map, long j) {
        ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
        map.forEach((dimension, str) -> {
            builder.setDimension(dimension.reportedName(), str);
        });
        this.emitter.emit(builder.setMetric(coordinatorStat.getMetricName(), Long.valueOf(j)));
    }

    private DruidCompactionConfig getLatestConfig() {
        return DruidCompactionConfig.empty().withClusterConfig(((DruidCompactionConfig) this.compactionConfigSupplier.get()).clusterConfig()).withDatasourceConfigs(new ArrayList(this.activeDatasourceConfigs.values()));
    }

    private DataSourcesSnapshot getDatasourceSnapshot() {
        return this.segmentManager.getSnapshotOfDataSourcesWithAllUsedSegments();
    }

    private void scheduleOnExecutor(Runnable runnable) {
        this.executor.schedule(() -> {
            try {
                runnable.run();
            } catch (Throwable th) {
                log.error(th, "Error while executing runnable", new Object[0]);
            }
        }, SCHEDULE_PERIOD_SECONDS, TimeUnit.SECONDS);
    }
}
