/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.log.remote.storage;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.NoSuchFileException;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageTraverser;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory;
import org.apache.kafka.server.log.remote.storage.Transferer;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class LocalTieredStorage
implements RemoteStorageManager {
    public static final String STORAGE_CONFIG_PREFIX = "remote.log.storage.local.";
    public static final String STORAGE_DIR_CONFIG = "dir";
    public static final String DELETE_ON_CLOSE_CONFIG = "delete.on.close";
    public static final String TRANSFERER_CLASS_CONFIG = "transferer.class";
    public static final String ENABLE_DELETE_API_CONFIG = "delete.enable";
    public static final String BROKER_ID = "broker.id";
    private static final String ROOT_STORAGE_DIR_NAME = "kafka-tiered-storage";
    private volatile File storageDirectory;
    private volatile boolean deleteOnClose = false;
    private volatile boolean deleteEnabled = true;
    private volatile Transferer transferer = new Transferer(){

        @Override
        public void transfer(File from, File to) throws IOException {
            if (from.exists()) {
                Files.copy(from.toPath(), to.toPath(), new CopyOption[0]);
            }
        }

        @Override
        public void transfer(ByteBuffer from, File to) throws IOException {
            if (from != null && from.hasRemaining()) {
                try (FileOutputStream fileOutputStream = new FileOutputStream(to, false);
                     FileChannel channel = fileOutputStream.getChannel();){
                    channel.write(from);
                }
            }
        }
    };
    private volatile int brokerId = -1;
    private Logger logger = LoggerFactory.getLogger(LocalTieredStorage.class);
    private final AtomicInteger eventTimestamp = new AtomicInteger(-1);
    private final LocalTieredStorageListener.LocalTieredStorageListeners storageListeners = new LocalTieredStorageListener.LocalTieredStorageListeners();
    private final LocalTieredStorageHistory history = new LocalTieredStorageHistory();

    public LocalTieredStorage() {
        this.history.listenTo(this);
    }

    public void traverse(LocalTieredStorageTraverser traverser) {
        Objects.requireNonNull(traverser);
        File[] files = this.storageDirectory.listFiles();
        if (files == null) {
            return;
        }
        Arrays.stream(files).filter(File::isDirectory).forEach(dir -> RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory(dir.getName(), this.storageDirectory).traverse(traverser));
    }

    public void addListener(LocalTieredStorageListener listener) {
        this.storageListeners.add(listener);
    }

    public void configure(Map<String, ?> configs) {
        if (this.storageDirectory != null) {
            throw new InvalidConfigurationException(String.format("This instance of local remote storageis already configured. The existing storage directory is %s. Ensure the method configure() is only called once.", this.storageDirectory.getAbsolutePath()));
        }
        String storageDir = (String)configs.get(STORAGE_DIR_CONFIG);
        String shouldDeleteOnClose = (String)configs.get(DELETE_ON_CLOSE_CONFIG);
        String transfererClass = (String)configs.get(TRANSFERER_CLASS_CONFIG);
        String isDeleteEnabled = (String)configs.get(ENABLE_DELETE_API_CONFIG);
        Integer brokerIdInt = (Integer)configs.get(BROKER_ID);
        if (brokerIdInt == null) {
            throw new InvalidConfigurationException("Broker ID is required to configure the LocalTieredStorage manager.");
        }
        this.brokerId = brokerIdInt;
        this.logger = new LogContext(String.format("[LocalTieredStorage Id=%d] ", this.brokerId)).logger(this.getClass());
        if (shouldDeleteOnClose != null) {
            this.deleteOnClose = Boolean.parseBoolean(shouldDeleteOnClose);
        }
        if (isDeleteEnabled != null) {
            this.deleteEnabled = Boolean.parseBoolean(isDeleteEnabled);
        }
        if (transfererClass != null) {
            try {
                this.transferer = (Transferer)this.getClass().getClassLoader().loadClass(transfererClass).newInstance();
            }
            catch (ClassCastException | ClassNotFoundException | IllegalAccessException | InstantiationException e) {
                throw new RuntimeException(String.format("Cannot create transferer from class '%s'", transfererClass), e);
            }
        }
        if (storageDir == null) {
            this.storageDirectory = TestUtils.tempDirectory((String)"kafka-tiered-storage-");
            this.logger.debug("No storage directory specified, created temporary directory: {}", (Object)this.storageDirectory.getAbsolutePath());
        } else {
            this.storageDirectory = new File(storageDir, ROOT_STORAGE_DIR_NAME);
            boolean existed = Files.exists(this.storageDirectory.toPath(), new LinkOption[0]);
            if (!existed) {
                try {
                    this.logger.info("Creating directory: [{}]", (Object)this.storageDirectory.getAbsolutePath());
                    Files.createDirectories(this.storageDirectory.toPath(), new FileAttribute[0]);
                }
                catch (IOException e) {
                    throw new RuntimeException(String.format("Not able to create the storage directory '%s'", this.storageDirectory.getAbsolutePath()), e);
                }
            } else {
                this.logger.warn("Remote storage with ID [{}] already exists on the file system. Any data already in the remote storage will not be deleted and may result in an inconsistent state and/or provide stale data.", (Object)storageDir);
            }
        }
        this.logger.info("Created local tiered storage manager [{}]:[{}]", (Object)this.brokerId, (Object)this.storageDirectory.getName());
    }

    public Optional<RemoteLogSegmentMetadata.CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata metadata, LogSegmentData data) throws RemoteStorageException {
        Callable<Optional> callable = () -> {
            LocalTieredStorageEvent.Builder eventBuilder = this.newEventBuilder(LocalTieredStorageEvent.EventType.COPY_SEGMENT, metadata);
            RemoteLogSegmentFileset fileset = null;
            try {
                fileset = RemoteLogSegmentFileset.openFileset(this.storageDirectory, metadata);
                this.logger.info("Offloading log segment for {} from segment={}", (Object)metadata.topicIdPartition(), (Object)data.logSegment());
                fileset.copy(this.transferer, data);
                this.storageListeners.onStorageEvent(eventBuilder.withFileset(fileset).build());
            }
            catch (Exception e) {
                if (fileset != null) {
                    fileset.delete();
                }
                this.storageListeners.onStorageEvent(eventBuilder.withException(e).build());
                throw e;
            }
            return Optional.empty();
        };
        return this.wrap(callable);
    }

    public InputStream fetchLogSegment(RemoteLogSegmentMetadata metadata, int startPos) throws RemoteStorageException {
        return this.fetchLogSegment(metadata, startPos, metadata.segmentSizeInBytes());
    }

    public InputStream fetchLogSegment(RemoteLogSegmentMetadata metadata, int startPos, int endPos) throws RemoteStorageException {
        LocalTieredStorage.checkArgument(startPos >= 0, "Start position must be positive", startPos);
        LocalTieredStorage.checkArgument(endPos >= startPos, "End position cannot be less than startPosition", startPos, endPos);
        LocalTieredStorage.checkArgument(metadata.segmentSizeInBytes() >= endPos, "End position cannot be greater than segment size", endPos, metadata.segmentSizeInBytes());
        return this.wrap(() -> {
            LocalTieredStorageEvent.Builder eventBuilder = this.newEventBuilder(LocalTieredStorageEvent.EventType.FETCH_SEGMENT, metadata);
            eventBuilder.withStartPosition(startPos).withEndPosition(endPos);
            try {
                RemoteLogSegmentFileset fileset = RemoteLogSegmentFileset.openFileset(this.storageDirectory, metadata);
                InputStream inputStream = Files.newInputStream(fileset.getFile(RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT).toPath(), StandardOpenOption.READ);
                inputStream.skip(startPos);
                this.storageListeners.onStorageEvent(eventBuilder.withFileset(fileset).build());
                return inputStream;
            }
            catch (Exception e) {
                this.storageListeners.onStorageEvent(eventBuilder.withException(e).build());
                throw e;
            }
        });
    }

    public InputStream fetchIndex(RemoteLogSegmentMetadata metadata, RemoteStorageManager.IndexType indexType) throws RemoteStorageException {
        LocalTieredStorageEvent.EventType eventType = this.getEventTypeForFetch(indexType);
        RemoteLogSegmentFileset.RemoteLogSegmentFileType fileType = this.getLogSegmentFileType(indexType);
        return this.wrap(() -> {
            LocalTieredStorageEvent.Builder eventBuilder = this.newEventBuilder(eventType, metadata);
            try {
                RemoteLogSegmentFileset fileset = RemoteLogSegmentFileset.openFileset(this.storageDirectory, metadata);
                File file = fileset.getFile(fileType);
                if (fileType.isOptional() && !file.exists()) {
                    throw new RemoteResourceNotFoundException("Index file for type: " + indexType + " not found for segment " + metadata.remoteLogSegmentId());
                }
                InputStream inputStream = Files.newInputStream(file.toPath(), StandardOpenOption.READ);
                this.storageListeners.onStorageEvent(eventBuilder.withFileset(fileset).build());
                return inputStream;
            }
            catch (Exception e) {
                this.storageListeners.onStorageEvent(eventBuilder.withException(e).build());
                throw e;
            }
        });
    }

    public void deleteLogSegmentData(RemoteLogSegmentMetadata metadata) throws RemoteStorageException {
        this.wrap(() -> {
            LocalTieredStorageEvent.Builder eventBuilder = this.newEventBuilder(LocalTieredStorageEvent.EventType.DELETE_SEGMENT, metadata);
            if (this.deleteEnabled) {
                try {
                    RemoteLogSegmentFileset fileset = RemoteLogSegmentFileset.openFileset(this.storageDirectory, metadata);
                    if (!fileset.delete()) {
                        throw new RemoteStorageException("Failed to delete remote log segment with id:" + metadata.remoteLogSegmentId());
                    }
                    this.storageListeners.onStorageEvent(eventBuilder.withFileset(fileset).build());
                }
                catch (Exception e) {
                    this.storageListeners.onStorageEvent(eventBuilder.withException(e).build());
                    throw e;
                }
            }
            return null;
        });
    }

    public void deletePartition(TopicIdPartition partition) throws RemoteStorageException {
        this.wrap(() -> {
            LocalTieredStorageEvent.Builder eventBuilder = this.newEventBuilder(LocalTieredStorageEvent.EventType.DELETE_PARTITION, partition);
            if (this.deleteEnabled) {
                try {
                    RemoteTopicPartitionDirectory partitionDirectory = RemoteTopicPartitionDirectory.openTopicPartitionDirectory(partition, this.storageDirectory);
                    if (!partitionDirectory.delete()) {
                        throw new RemoteStorageException("Failed to delete remote log partition:" + partition);
                    }
                    this.storageListeners.onStorageEvent(eventBuilder.build());
                }
                catch (Exception e) {
                    this.storageListeners.onStorageEvent(eventBuilder.withException(e).build());
                    throw e;
                }
            }
            return null;
        });
    }

    public void close() {
        if (this.deleteOnClose) {
            this.clear();
        }
    }

    public void clear() {
        try {
            File root;
            File[] files = this.storageDirectory.listFiles();
            Optional<File> notADirectory = Arrays.stream(files).filter(f -> !f.isDirectory()).findAny();
            if (notADirectory.isPresent()) {
                this.logger.warn("Found file [{}] which is not a remote topic-partition directory. Stopping the deletion process.", (Object)notADirectory.get());
                return;
            }
            boolean success = Arrays.stream(files).map(dir -> RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory(dir.getName(), this.storageDirectory)).map(RemoteTopicPartitionDirectory::delete).reduce(true, Boolean::logicalAnd);
            if (success) {
                this.storageDirectory.delete();
            }
            if ((root = new File(ROOT_STORAGE_DIR_NAME)).exists() && root.isDirectory() && root.list().length == 0) {
                root.delete();
            }
        }
        catch (Exception e) {
            this.logger.error("Error while deleting remote storage. Stopping the deletion process.", (Throwable)e);
        }
    }

    public LocalTieredStorageHistory getHistory() {
        return this.history;
    }

    String getStorageDirectoryRoot() throws RemoteStorageException {
        return this.wrap(() -> this.storageDirectory.getAbsolutePath());
    }

    private LocalTieredStorageEvent.Builder newEventBuilder(LocalTieredStorageEvent.EventType type, RemoteLogSegmentMetadata md) {
        return LocalTieredStorageEvent.newBuilder(this.brokerId, type, this.eventTimestamp.incrementAndGet(), md.remoteLogSegmentId()).withMetadata(md);
    }

    private LocalTieredStorageEvent.Builder newEventBuilder(LocalTieredStorageEvent.EventType type, TopicIdPartition tpId) {
        return LocalTieredStorageEvent.newBuilder(this.brokerId, type, this.eventTimestamp.incrementAndGet(), new RemoteLogSegmentId(tpId, Uuid.ZERO_UUID));
    }

    private <U> U wrap(Callable<U> f) throws RemoteStorageException {
        if (this.storageDirectory == null) {
            throw new RemoteStorageException("No storage directory was defined for the local remote storage. Make sure the instance was configured correctly via the configure() method.");
        }
        try {
            return f.call();
        }
        catch (RemoteStorageException rse) {
            throw rse;
        }
        catch (FileNotFoundException | NoSuchFileException e) {
            throw new RemoteResourceNotFoundException((Throwable)e);
        }
        catch (Exception e) {
            throw new RemoteStorageException("Internal error in local remote storage", (Throwable)e);
        }
    }

    private static void checkArgument(boolean valid, String message, Object ... args) {
        if (!valid) {
            throw new IllegalArgumentException(message + ": " + Arrays.toString(args));
        }
    }

    private LocalTieredStorageEvent.EventType getEventTypeForFetch(RemoteStorageManager.IndexType indexType) {
        switch (indexType) {
            case OFFSET: {
                return LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
            }
            case TIMESTAMP: {
                return LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
            }
            case PRODUCER_SNAPSHOT: {
                return LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
            }
            case TRANSACTION: {
                return LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
            }
            case LEADER_EPOCH: {
                return LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
            }
        }
        return LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
    }

    private RemoteLogSegmentFileset.RemoteLogSegmentFileType getLogSegmentFileType(RemoteStorageManager.IndexType indexType) {
        switch (indexType) {
            case OFFSET: {
                return RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
            }
            case TIMESTAMP: {
                return RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
            }
            case PRODUCER_SNAPSHOT: {
                return RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
            }
            case TRANSACTION: {
                return RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
            }
            case LEADER_EPOCH: {
                return RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
            }
        }
        return RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
    }

    public int brokerId() {
        return this.brokerId;
    }
}

