package org.apache.samza.monitor;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.DirectoryFileFilter;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.rest.model.JobStatus;
import org.apache.samza.rest.model.Task;
import org.apache.samza.rest.proxy.job.JobInstance;
import org.apache.samza.storage.StorageManagerUtil;
import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/monitor/LocalStoreMonitor.class */
public class LocalStoreMonitor implements Monitor {
    private static final Clock CLOCK = SystemClock.instance();
    private static final Logger LOG = LoggerFactory.getLogger(LocalStoreMonitor.class);
    private static final String OFFSET_FILE_NAME = "OFFSET-v2";
    private final JobsClient jobsClient;
    private final LocalStoreMonitorConfig config;
    private final LocalStoreMonitorMetrics localStoreMonitorMetrics;

    public LocalStoreMonitor(LocalStoreMonitorConfig localStoreMonitorConfig, LocalStoreMonitorMetrics localStoreMonitorMetrics, JobsClient jobsClient) {
        Preconditions.checkState(!Strings.isNullOrEmpty(localStoreMonitorConfig.getLocalStoreBaseDir()), String.format("%s is not set in config.", "job.local.store.dir"));
        this.config = localStoreMonitorConfig;
        this.jobsClient = jobsClient;
        this.localStoreMonitorMetrics = localStoreMonitorMetrics;
    }

    @Override // org.apache.samza.monitor.Monitor
    public void monitor() throws Exception {
        File file = new File(this.config.getLocalStoreBaseDir());
        Preconditions.checkState(file.isDirectory(), String.format("LocalStoreDir: %s is not a directory", file.getAbsolutePath()));
        String hostName = InetAddress.getLocalHost().getHostName();
        for (JobInstance jobInstance : getHostAffinityEnabledJobs(file)) {
            File file2 = new File(file, String.format("%s-%s", jobInstance.getJobName(), jobInstance.getJobId()));
            try {
                JobStatus jobStatus = this.jobsClient.getJobStatus(jobInstance);
                LOG.info("Job: {} has the status: {}.", jobInstance, jobStatus);
                for (Task task : this.jobsClient.getTasks(jobInstance)) {
                    LOG.info("Evaluating stores for task: {}", task);
                    for (String str : file2.list(DirectoryFileFilter.DIRECTORY)) {
                        if (jobStatus.hasBeenStarted() && task.getStoreNames().contains(str) && task.getPreferredHost().equals(hostName)) {
                            LOG.info(String.format("Local store: %s is actively used by the task: %s.", str, task.getTaskName()));
                        } else {
                            LOG.info(String.format("Local store: %s not used by the task: %s.", str, task.getTaskName()));
                            markSweepTaskStore(new StorageManagerUtil().getTaskStoreDir(file2, str, new TaskName(task.getTaskName()), TaskMode.Active));
                        }
                    }
                }
            } catch (Exception e) {
                this.localStoreMonitorMetrics.failedStoreDeletionAttempts.inc();
                if (!this.config.getIgnoreFailures()) {
                    throw e;
                }
                LOG.warn("Config: {} turned on, failures will be ignored. Local store cleanup for job: {} resulted in exception: {}.", new Object[]{LocalStoreMonitorConfig.CONFIG_IGNORE_FAILURES, jobInstance, e});
            }
        }
    }

    private static List<JobInstance> getHostAffinityEnabledJobs(File file) {
        ArrayList arrayList = new ArrayList();
        for (File file2 : file.listFiles((v0) -> {
            return v0.isDirectory();
        })) {
            String name = file2.getName();
            int lastIndexOf = name.lastIndexOf("-");
            if (lastIndexOf != -1) {
                arrayList.add(new JobInstance(name.substring(0, lastIndexOf), name.substring(lastIndexOf + 1)));
            }
        }
        return arrayList;
    }

    private void markSweepTaskStore(File file) throws IOException {
        String absolutePath = file.getAbsolutePath();
        File file2 = new File(file, OFFSET_FILE_NAME);
        if (file2.exists()) {
            if (CLOCK.currentTimeMillis() - file2.lastModified() >= this.config.getOffsetFileTTL()) {
                LOG.info("Deleting the offset file from the store: {}, since the last modified timestamp: {} is older than the configured ttl: {}.", new Object[]{absolutePath, Long.valueOf(file2.lastModified()), Long.valueOf(this.config.getOffsetFileTTL())});
                file2.delete();
                return;
            }
            return;
        }
        LOG.info("Deleting the task store: {}, since it has no offset file.", absolutePath);
        long totalSpace = file.getTotalSpace();
        FileUtils.deleteDirectory(file);
        this.localStoreMonitorMetrics.diskSpaceFreedInBytes.inc(totalSpace);
        this.localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.inc();
    }
}
