package com.microsoft.reef.io.checkpoint.fs;

import com.microsoft.reef.io.checkpoint.CheckpointID;
import com.microsoft.reef.io.checkpoint.CheckpointNamingService;
import com.microsoft.reef.io.checkpoint.CheckpointService;
import com.microsoft.tang.annotations.Name;
import com.microsoft.tang.annotations.NamedParameter;
import com.microsoft.tang.annotations.Parameter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import javax.inject.Inject;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/* loaded from: input_file:com/microsoft/reef/io/checkpoint/fs/FSCheckpointService.class */
public class FSCheckpointService implements CheckpointService {
    private final Path base;
    private final FileSystem fs;
    private final CheckpointNamingService namingPolicy;
    private final short replication;

    /* loaded from: input_file:com/microsoft/reef/io/checkpoint/fs/FSCheckpointService$FSCheckpointReadChannel.class */
    private static class FSCheckpointReadChannel implements CheckpointService.CheckpointReadChannel {
        private boolean isOpen = true;
        private final ReadableByteChannel in;

        FSCheckpointReadChannel(FSDataInputStream fSDataInputStream) {
            this.in = Channels.newChannel((InputStream) fSDataInputStream);
        }

        @Override // java.nio.channels.ReadableByteChannel
        public int read(ByteBuffer byteBuffer) throws IOException {
            return this.in.read(byteBuffer);
        }

        @Override // java.nio.channels.Channel, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.isOpen = false;
            this.in.close();
        }

        @Override // java.nio.channels.Channel
        public boolean isOpen() {
            return this.isOpen;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/microsoft/reef/io/checkpoint/fs/FSCheckpointService$FSCheckpointWriteChannel.class */
    public static class FSCheckpointWriteChannel implements CheckpointService.CheckpointWriteChannel {
        private boolean isOpen = true;
        private final Path finalDst;
        private final WritableByteChannel out;

        FSCheckpointWriteChannel(Path path, FSDataOutputStream fSDataOutputStream) {
            this.finalDst = path;
            this.out = Channels.newChannel((OutputStream) fSDataOutputStream);
        }

        @Override // java.nio.channels.WritableByteChannel
        public int write(ByteBuffer byteBuffer) throws IOException {
            return this.out.write(byteBuffer);
        }

        public Path getDestination() {
            return this.finalDst;
        }

        @Override // java.nio.channels.Channel, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.isOpen = false;
            this.out.close();
        }

        @Override // java.nio.channels.Channel
        public boolean isOpen() {
            return this.isOpen;
        }
    }

    @NamedParameter(doc = "The path to be used to store the checkpoints.")
    /* loaded from: input_file:com/microsoft/reef/io/checkpoint/fs/FSCheckpointService$PATH.class */
    static class PATH implements Name<String> {
        PATH() {
        }
    }

    @NamedParameter(doc = "The replication factor to be used for the stored checkpoints", default_value = "3")
    /* loaded from: input_file:com/microsoft/reef/io/checkpoint/fs/FSCheckpointService$REPLICATION_FACTOR.class */
    static class REPLICATION_FACTOR implements Name<Short> {
        REPLICATION_FACTOR() {
        }
    }

    @Inject
    FSCheckpointService(FileSystem fileSystem, @Parameter(PATH.class) String str, CheckpointNamingService checkpointNamingService, @Parameter(REPLICATION_FACTOR.class) short s) {
        this.fs = fileSystem;
        this.base = new Path(str);
        this.namingPolicy = checkpointNamingService;
        this.replication = s;
    }

    public FSCheckpointService(FileSystem fileSystem, Path path, CheckpointNamingService checkpointNamingService, short s) {
        this.fs = fileSystem;
        this.base = path;
        this.namingPolicy = checkpointNamingService;
        this.replication = s;
    }

    @Override // com.microsoft.reef.io.checkpoint.CheckpointService
    public CheckpointService.CheckpointWriteChannel create() throws IOException {
        Path path = new Path(this.namingPolicy.getNewName());
        if (path.isUriPathAbsolute()) {
            throw new IOException("Checkpoint cannot be an absolute path");
        }
        return createInternal(new Path(this.base, path));
    }

    CheckpointService.CheckpointWriteChannel createInternal(Path path) throws IOException {
        return new FSCheckpointWriteChannel(path, this.fs.create(tmpfile(path), this.replication));
    }

    @Override // com.microsoft.reef.io.checkpoint.CheckpointService
    public CheckpointService.CheckpointReadChannel open(CheckpointID checkpointID) throws IOException, InterruptedException {
        if (checkpointID instanceof FSCheckpointID) {
            return new FSCheckpointReadChannel(this.fs.open(((FSCheckpointID) checkpointID).getPath()));
        }
        throw new IllegalArgumentException("Mismatched checkpoint type: " + checkpointID.getClass());
    }

    @Override // com.microsoft.reef.io.checkpoint.CheckpointService
    public CheckpointID commit(CheckpointService.CheckpointWriteChannel checkpointWriteChannel) throws IOException, InterruptedException {
        if (checkpointWriteChannel.isOpen()) {
            checkpointWriteChannel.close();
        }
        FSCheckpointWriteChannel fSCheckpointWriteChannel = (FSCheckpointWriteChannel) checkpointWriteChannel;
        Path destination = fSCheckpointWriteChannel.getDestination();
        if (this.fs.rename(tmpfile(destination), destination)) {
            return new FSCheckpointID(fSCheckpointWriteChannel.getDestination());
        }
        abort(checkpointWriteChannel);
        throw new IOException("Failed to promote checkpoint" + tmpfile(destination) + " -> " + destination);
    }

    @Override // com.microsoft.reef.io.checkpoint.CheckpointService
    public void abort(CheckpointService.CheckpointWriteChannel checkpointWriteChannel) throws IOException {
        if (checkpointWriteChannel.isOpen()) {
            checkpointWriteChannel.close();
        }
        try {
            if (this.fs.delete(tmpfile(((FSCheckpointWriteChannel) checkpointWriteChannel).getDestination()), false)) {
            } else {
                throw new IOException("Failed to delete checkpoint during abort");
            }
        } catch (FileNotFoundException e) {
        }
    }

    @Override // com.microsoft.reef.io.checkpoint.CheckpointService
    public boolean delete(CheckpointID checkpointID) throws IOException, InterruptedException {
        if (!(checkpointID instanceof FSCheckpointID)) {
            throw new IllegalArgumentException("Mismatched checkpoint type: " + checkpointID.getClass());
        }
        try {
            return this.fs.delete(((FSCheckpointID) checkpointID).getPath(), false);
        } catch (FileNotFoundException e) {
            return true;
        }
    }

    static final Path tmpfile(Path path) {
        return new Path(path.getParent(), path.getName() + ".tmp");
    }
}
