package org.apache.kafka.streams.processor.internals;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StateDirectory.class */
public class StateDirectory {
    private static final Pattern TASK_DIR_PATH_NAME = Pattern.compile("\\d+_\\d+");
    private static final Pattern NAMED_TOPOLOGY_DIR_PATH_NAME = Pattern.compile("__.+__");
    private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
    static final String LOCK_FILE_NAME = ".lock";
    static final String PROCESS_FILE_NAME = "kafka-streams-process-metadata";
    private final Time time;
    private final String appId;
    private final File stateDir;
    private final boolean hasPersistentStores;
    private final boolean hasNamedTopologies;
    private FileChannel stateDirLockChannel;
    private FileLock stateDirLock;
    private final Object taskDirCreationLock = new Object();
    private final HashMap<TaskId, Thread> lockedTasksToOwner = new HashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    @JsonIgnoreProperties(ignoreUnknown = true)
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StateDirectory$StateDirectoryProcessFile.class */
    public static class StateDirectoryProcessFile {

        @JsonProperty
        private final UUID processId;

        public StateDirectoryProcessFile() {
            this.processId = null;
        }

        StateDirectoryProcessFile(UUID uuid) {
            this.processId = uuid;
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StateDirectory$TaskDirectory.class */
    public static class TaskDirectory {
        private final File file;
        private final String namedTopology;

        TaskDirectory(File file, String str) {
            this.file = file;
            this.namedTopology = str;
        }

        public File file() {
            return this.file;
        }

        public String namedTopology() {
            return this.namedTopology;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            TaskDirectory taskDirectory = (TaskDirectory) obj;
            return this.file.equals(taskDirectory.file) && Objects.equals(this.namedTopology, taskDirectory.namedTopology);
        }

        public int hashCode() {
            return Objects.hash(this.file, this.namedTopology);
        }
    }

    public StateDirectory(StreamsConfig streamsConfig, Time time, boolean z, boolean z2) {
        this.time = time;
        this.hasPersistentStores = z;
        this.hasNamedTopologies = z2;
        this.appId = streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG);
        String string = streamsConfig.getString(StreamsConfig.STATE_DIR_CONFIG);
        File file = new File(string);
        this.stateDir = new File(file, this.appId);
        if (this.hasPersistentStores) {
            if (!file.exists() && !file.mkdirs()) {
                throw new ProcessorStateException(String.format("base state directory [%s] doesn't exist and couldn't be created", string));
            }
            if (!this.stateDir.exists() && !this.stateDir.mkdir()) {
                throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", this.stateDir.getPath()));
            }
            if (this.stateDir.exists() && !this.stateDir.isDirectory()) {
                throw new ProcessorStateException(String.format("state directory [%s] can't be created as there is an existing file with the same name", this.stateDir.getPath()));
            }
            if (string.startsWith(System.getProperty("java.io.tmpdir"))) {
                log.warn("Using an OS temp directory in the state.dir property can cause failures with writing the checkpoint file due to the fact that this directory can be cleared by the OS. Resolved state.dir: [" + string + "]");
            }
            configurePermissions(file);
            configurePermissions(this.stateDir);
        }
    }

    private void configurePermissions(File file) {
        Path path = file.toPath();
        if (!path.getFileSystem().supportedFileAttributeViews().contains("posix")) {
            if (!(file.setReadable(true, true) & file.setWritable(true, true)) || !file.setExecutable(true, true)) {
                log.error("Failed to change permissions for the directory {}", file);
            }
        } else {
            try {
                Files.setPosixFilePermissions(path, PosixFilePermissions.fromString("rwxr-x---"));
            } catch (IOException e) {
                log.error("Error changing permissions for the directory {} ", path, e);
            }
        }
    }

    private boolean lockStateDirectory() {
        try {
            this.stateDirLockChannel = FileChannel.open(new File(this.stateDir, LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
            this.stateDirLock = tryLock(this.stateDirLockChannel);
            return this.stateDirLock != null;
        } catch (IOException e) {
            log.error("Unable to lock the state directory due to unexpected exception", e);
            throw new ProcessorStateException(String.format("Failed to lock the state directory [%s] during startup", this.stateDir.getAbsolutePath()), e);
        }
    }

    public UUID initializeProcessId() {
        if (!this.hasPersistentStores) {
            return UUID.randomUUID();
        }
        if (!lockStateDirectory()) {
            log.error("Unable to obtain lock as state directory is already locked by another process");
            throw new StreamsException(String.format("Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory (current state directory is [%s]", this.stateDir.getAbsolutePath()));
        }
        File file = new File(this.stateDir, PROCESS_FILE_NAME);
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            if (file.exists()) {
                try {
                    StateDirectoryProcessFile stateDirectoryProcessFile = (StateDirectoryProcessFile) objectMapper.readValue(file, StateDirectoryProcessFile.class);
                    log.info("Reading UUID from process file: {}", stateDirectoryProcessFile.processId);
                    if (stateDirectoryProcessFile.processId != null) {
                        return stateDirectoryProcessFile.processId;
                    }
                } catch (Exception e) {
                    log.warn("Failed to read json process file", e);
                }
            }
            StateDirectoryProcessFile stateDirectoryProcessFile2 = new StateDirectoryProcessFile(UUID.randomUUID());
            log.info("No process id found on disk, got fresh process id {}", stateDirectoryProcessFile2.processId);
            objectMapper.writeValue(file, stateDirectoryProcessFile2);
            return stateDirectoryProcessFile2.processId;
        } catch (IOException e2) {
            log.error("Unable to read/write process file due to unexpected exception", e2);
            throw new ProcessorStateException(e2);
        }
    }

    public File getOrCreateDirectoryForTask(TaskId taskId) {
        File taskDirectoryParentName = getTaskDirectoryParentName(taskId);
        File file = new File(taskDirectoryParentName, StateManagerUtil.toTaskDirString(taskId));
        if (this.hasPersistentStores) {
            if (!file.exists()) {
                synchronized (this.taskDirCreationLock) {
                    if (!taskDirectoryParentName.exists() && !taskDirectoryParentName.mkdir()) {
                        throw new ProcessorStateException(String.format("Parent [%s] of task directory [%s] doesn't exist and couldn't be created", taskDirectoryParentName.getPath(), file.getPath()));
                    }
                    if (!file.exists() && !file.mkdir()) {
                        throw new ProcessorStateException(String.format("task directory [%s] doesn't exist and couldn't be created", file.getPath()));
                    }
                }
            } else if (!file.isDirectory()) {
                throw new ProcessorStateException(String.format("state directory [%s] can't be created as there is an existing file with the same name", file.getPath()));
            }
        }
        return file;
    }

    private File getTaskDirectoryParentName(TaskId taskId) {
        String str = taskId.topologyName();
        if (str == null) {
            return this.stateDir;
        }
        if (this.hasNamedTopologies) {
            return new File(this.stateDir, getNamedTopologyDirName(str));
        }
        throw new IllegalStateException("Tried to lookup taskId with named topology, but StateDirectory thinks hasNamedTopologies = false");
    }

    private String getNamedTopologyDirName(String str) {
        return TaskId.NAMED_TOPOLOGY_DELIMITER + str + TaskId.NAMED_TOPOLOGY_DELIMITER;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public File checkpointFileFor(TaskId taskId) {
        return new File(getOrCreateDirectoryForTask(taskId), ".checkpoint");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean directoryForTaskIsEmpty(TaskId taskId) {
        return taskDirIsEmpty(getOrCreateDirectoryForTask(taskId));
    }

    private boolean taskDirIsEmpty(File file) {
        File[] listFiles = file.listFiles(file2 -> {
            return !file2.getName().equals(".checkpoint");
        });
        boolean z = true;
        if (listFiles != null && listFiles.length > 0) {
            for (File file3 : listFiles) {
                if (!file3.getName().endsWith(LOCK_FILE_NAME)) {
                    log.trace("TaskDir {} was not empty, found {}", file, file3);
                    z = false;
                } else if (!file3.delete()) {
                    log.warn("Error encountered deleting lock file in {}", file);
                }
            }
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public File globalStateDir() {
        File file = new File(this.stateDir, "global");
        if (this.hasPersistentStores) {
            if (!file.exists() && !file.mkdir()) {
                throw new ProcessorStateException(String.format("global state directory [%s] doesn't exist and couldn't be created", file.getPath()));
            }
            if (file.exists() && !file.isDirectory()) {
                throw new ProcessorStateException(String.format("global state directory [%s] can't be created as there is an existing file with the same name", file.getPath()));
            }
        }
        return file;
    }

    private String logPrefix() {
        return String.format("stream-thread [%s]", Thread.currentThread().getName());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean lock(TaskId taskId) {
        if (!this.hasPersistentStores) {
            return true;
        }
        Thread thread = this.lockedTasksToOwner.get(taskId);
        if (thread != null) {
            if (!thread.equals(Thread.currentThread())) {
                return false;
            }
            log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId);
            return true;
        }
        if (!this.stateDir.exists()) {
            log.error("Tried to lock task directory for {} but the state directory does not exist", taskId);
            throw new IllegalStateException("The state directory has been deleted");
        }
        this.lockedTasksToOwner.put(taskId, Thread.currentThread());
        getOrCreateDirectoryForTask(taskId);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void unlock(TaskId taskId) {
        Thread thread = this.lockedTasksToOwner.get(taskId);
        if (thread == null || !thread.equals(Thread.currentThread())) {
            return;
        }
        this.lockedTasksToOwner.remove(taskId);
        log.debug("{} Released state dir lock for task {}", logPrefix(), taskId);
    }

    public void close() {
        if (this.hasPersistentStores) {
            try {
                this.stateDirLock.release();
                this.stateDirLockChannel.close();
                this.stateDirLock = null;
                this.stateDirLockChannel = null;
                if (this.lockedTasksToOwner.isEmpty()) {
                    return;
                }
                log.error("Some task directories still locked while closing state, this indicates unclean shutdown: {}", this.lockedTasksToOwner);
            } catch (IOException e) {
                log.error("Unexpected exception while unlocking the state dir", e);
                throw new StreamsException(String.format("Failed to release the lock on the state directory [%s]", this.stateDir.getAbsolutePath()), e);
            }
        }
    }

    public synchronized void clean() {
        try {
            cleanStateAndTaskDirectoriesCalledByUser();
            try {
                if (this.stateDir.exists()) {
                    Utils.delete(globalStateDir().getAbsoluteFile());
                }
                try {
                    if (this.hasPersistentStores && this.stateDir.exists() && !this.stateDir.delete()) {
                        log.warn(String.format("%s Failed to delete state store directory of %s for it is not empty", logPrefix(), this.stateDir.getAbsolutePath()));
                    }
                } catch (SecurityException e) {
                    log.error(String.format("%s Failed to delete state store directory of %s due to an unexpected exception", logPrefix(), this.stateDir.getAbsolutePath()), e);
                    throw new StreamsException(e);
                }
            } catch (IOException e2) {
                log.error(String.format("%s Failed to delete global state directory of %s due to an unexpected exception", logPrefix(), this.appId), e2);
                throw new StreamsException(e2);
            }
        } catch (Exception e3) {
            throw new StreamsException(e3);
        }
    }

    public synchronized void cleanRemovedTasks(long j) {
        try {
            cleanRemovedTasksCalledByCleanerThread(j);
        } catch (Exception e) {
            throw new IllegalStateException("Should have swallowed exception.", e);
        }
    }

    private void cleanRemovedTasksCalledByCleanerThread(long j) {
        for (TaskDirectory taskDirectory : listAllTaskDirectories()) {
            String name = taskDirectory.file().getName();
            TaskId parseTaskDirectoryName = StateManagerUtil.parseTaskDirectoryName(name, taskDirectory.namedTopology());
            if (!this.lockedTasksToOwner.containsKey(parseTaskDirectoryName)) {
                try {
                    try {
                        if (lock(parseTaskDirectoryName)) {
                            long milliseconds = this.time.milliseconds();
                            long lastModified = taskDirectory.file().lastModified();
                            if (milliseconds - j > lastModified) {
                                log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).", new Object[]{logPrefix(), name, parseTaskDirectoryName, Long.valueOf(milliseconds - lastModified), Long.valueOf(j)});
                                Utils.delete(taskDirectory.file());
                            }
                        }
                        unlock(parseTaskDirectoryName);
                    } catch (IOException e) {
                        log.warn(String.format("%s Swallowed the following exception during deletion of obsolete state directory %s for task %s:", logPrefix(), name, parseTaskDirectoryName), e);
                        unlock(parseTaskDirectoryName);
                    }
                } catch (Throwable th) {
                    unlock(parseTaskDirectoryName);
                    throw th;
                }
            }
        }
        maybeCleanEmptyNamedTopologyDirs(true);
    }

    private IOException maybeCleanEmptyNamedTopologyDirs(boolean z) {
        if (!this.hasNamedTopologies) {
            return null;
        }
        AtomicReference atomicReference = new AtomicReference(null);
        File[] listFiles = this.stateDir.listFiles(file -> {
            return file.isDirectory() && NAMED_TOPOLOGY_DIR_PATH_NAME.matcher(file.getName()).matches();
        });
        if (listFiles != null) {
            for (File file2 : listFiles) {
                File[] listFiles2 = file2.listFiles();
                if (listFiles2 != null && listFiles2.length == 0) {
                    try {
                        Utils.delete(file2);
                    } catch (IOException e) {
                        if (z) {
                            log.warn(String.format("%sSwallowed the following exception during deletion of named topology directory %s", logPrefix(), file2.getName()), e);
                        } else {
                            log.error(String.format("%s Failed to delete named topology directory %s with exception:", logPrefix(), file2.getName()), e);
                        }
                        atomicReference.compareAndSet(null, e);
                    }
                }
            }
        }
        return (IOException) atomicReference.get();
    }

    public void clearLocalStateForNamedTopology(String str) {
        File file = new File(this.stateDir, getNamedTopologyDirName(str));
        if (!file.exists() || !file.isDirectory()) {
            log.debug("Tried to clear out the local state for NamedTopology {} but none was found", str);
        }
        try {
            Utils.delete(file);
        } catch (IOException e) {
            log.error("Hit an unexpected error while clearing local state for topology " + str, e);
            throw new StreamsException("Unable to delete state for the named topology " + str, e, new TaskId(-1, -1, str));
        }
    }

    private void cleanStateAndTaskDirectoriesCalledByUser() throws Exception {
        if (!this.lockedTasksToOwner.isEmpty()) {
            log.warn("Found some still-locked task directories when user requested to cleaning up the state, since Streams is not running any more these will be ignored to complete the cleanup");
        }
        AtomicReference atomicReference = new AtomicReference();
        for (TaskDirectory taskDirectory : listAllTaskDirectories()) {
            String name = taskDirectory.file().getName();
            TaskId parseTaskDirectoryName = StateManagerUtil.parseTaskDirectoryName(name, taskDirectory.namedTopology());
            try {
                log.info("{} Deleting task directory {} for {} as user calling cleanup.", new Object[]{logPrefix(), name, parseTaskDirectoryName});
                if (this.lockedTasksToOwner.containsKey(parseTaskDirectoryName)) {
                    log.warn("{} Task {} in state directory {} was still locked by {}", new Object[]{logPrefix(), name, parseTaskDirectoryName, this.lockedTasksToOwner.get(parseTaskDirectoryName)});
                }
                Utils.delete(taskDirectory.file());
            } catch (IOException e) {
                log.error(String.format("%s Failed to delete task directory %s for %s with exception:", logPrefix(), name, parseTaskDirectoryName), e);
                atomicReference.compareAndSet(null, e);
            }
        }
        atomicReference.compareAndSet(null, maybeCleanEmptyNamedTopologyDirs(false));
        Exception exc = (Exception) atomicReference.get();
        if (exc != null) {
            throw exc;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<TaskDirectory> listNonEmptyTaskDirectories() {
        return listTaskDirectories(file -> {
            return file.isDirectory() && TASK_DIR_PATH_NAME.matcher(file.getName()).matches() && !taskDirIsEmpty(file);
        });
    }

    List<TaskDirectory> listAllTaskDirectories() {
        return listTaskDirectories(file -> {
            return file.isDirectory() && TASK_DIR_PATH_NAME.matcher(file.getName()).matches();
        });
    }

    private List<TaskDirectory> listTaskDirectories(FileFilter fileFilter) {
        ArrayList arrayList = new ArrayList();
        if (this.hasPersistentStores && this.stateDir.exists()) {
            if (this.hasNamedTopologies) {
                for (File file : listNamedTopologyDirs()) {
                    String parseNamedTopologyFromDirectory = parseNamedTopologyFromDirectory(file.getName());
                    File[] listFiles = file.listFiles(fileFilter);
                    if (listFiles != null) {
                        arrayList.addAll((Collection) Arrays.stream(listFiles).map(file2 -> {
                            return new TaskDirectory(file2, parseNamedTopologyFromDirectory);
                        }).collect(Collectors.toList()));
                    }
                }
            } else {
                File[] listFiles2 = this.stateDir.listFiles(fileFilter);
                if (listFiles2 != null) {
                    arrayList.addAll((Collection) Arrays.stream(listFiles2).map(file3 -> {
                        return new TaskDirectory(file3, null);
                    }).collect(Collectors.toList()));
                }
            }
        }
        return arrayList;
    }

    private List<File> listNamedTopologyDirs() {
        File[] listFiles = this.stateDir.listFiles(file -> {
            return file.getName().startsWith(TaskId.NAMED_TOPOLOGY_DELIMITER) && file.getName().endsWith(TaskId.NAMED_TOPOLOGY_DELIMITER);
        });
        return listFiles != null ? Arrays.asList(listFiles) : Collections.emptyList();
    }

    private String parseNamedTopologyFromDirectory(String str) {
        return str.substring(2, str.length() - 2);
    }

    private FileLock tryLock(FileChannel fileChannel) throws IOException {
        try {
            return fileChannel.tryLock();
        } catch (OverlappingFileLockException e) {
            return null;
        }
    }
}
