package com.datatorrent.stram;

import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.util.FSUtil;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.util.EnumSet;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/FSRecoveryHandler.class */
public class FSRecoveryHandler implements StreamingContainerManager.RecoveryHandler {
    private static final Logger LOG = LoggerFactory.getLogger(FSRecoveryHandler.class);
    private final Path basedir;
    private final Path logPath;
    private final Path logBackupPath;
    private final FileSystem fs;
    private final Path snapshotPath;
    private final Path snapshotBackupPath;
    private final Path heartbeatPath;
    public static final String FILE_LOG = "log";
    public static final String FILE_LOG_BACKUP = "log0";
    public static final String FILE_SNAPSHOT = "snapshot";
    public static final String FILE_SNAPSHOT_BACKUP = "snapshot0";
    private static final String DIRECTORY_RECOVERY = "recovery";
    private static final String FILE_HEARTBEATURI = "heartbeatUri";

    public FSRecoveryHandler(String str, Configuration configuration) throws IOException {
        this.basedir = new Path(str, DIRECTORY_RECOVERY);
        this.fs = FileSystem.newInstance(this.basedir.toUri(), configuration);
        this.logPath = new Path(this.basedir, FILE_LOG);
        this.logBackupPath = new Path(this.basedir, FILE_LOG_BACKUP);
        this.snapshotPath = new Path(this.basedir, FILE_SNAPSHOT);
        this.snapshotBackupPath = new Path(this.basedir, FILE_SNAPSHOT_BACKUP);
        this.heartbeatPath = new Path(this.basedir, FILE_HEARTBEATURI);
    }

    public String getDir() {
        return this.basedir.toUri().toString();
    }

    @Override // com.datatorrent.stram.StreamingContainerManager.RecoveryHandler
    public DataOutputStream rotateLog() throws IOException {
        FSDataOutputStream create;
        if (this.fs.exists(this.logBackupPath)) {
            throw new AssertionError("Snapshot state prior to log rotation: " + this.logBackupPath);
        }
        if (this.fs.exists(this.logPath)) {
            LOG.debug("Creating log backup {}", this.logBackupPath);
            if (!this.fs.rename(this.logPath, this.logBackupPath)) {
                throw new IOException("Failed to rotate log: " + this.logPath);
            }
        }
        LOG.info("Creating {}", this.logPath);
        String str = null;
        try {
            str = this.fs.getScheme();
        } catch (UnsupportedOperationException e) {
            LOG.warn("{} doesn't implement getScheme() method", this.fs.getClass().getName());
        }
        if ("file".equals(str)) {
            FSUtil.mkdirs(this.fs, this.logPath.getParent());
            create = new FSDataOutputStream(new FileOutputStream(Path.getPathWithoutSchemeAndAuthority(this.logPath).toString()), (FileSystem.Statistics) null);
        } else {
            create = this.fs.create(this.logPath);
        }
        final FSDataOutputStream fSDataOutputStream = create;
        return new DataOutputStream(create) { // from class: com.datatorrent.stram.FSRecoveryHandler.1
            @Override // java.io.DataOutputStream, java.io.FilterOutputStream, java.io.OutputStream, java.io.Flushable
            public void flush() throws IOException {
                super.flush();
                fSDataOutputStream.hflush();
            }

            @Override // java.io.FilterOutputStream, java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                FSRecoveryHandler.LOG.debug("Closing {}", FSRecoveryHandler.this.logPath);
                super.close();
            }
        };
    }

    @Override // com.datatorrent.stram.StreamingContainerManager.RecoveryHandler
    public DataInputStream getLog() throws IOException {
        if (this.fs.exists(this.logBackupPath)) {
            throw new AssertionError("Restore state prior to reading log: " + this.logBackupPath);
        }
        if (this.fs.exists(this.logPath)) {
            LOG.debug("Opening existing log ({})", this.logPath);
            return this.fs.open(this.logPath);
        }
        LOG.debug("No existing log ({})", this.logPath);
        return new DataInputStream(new ByteArrayInputStream(new byte[0]));
    }

    @Override // com.datatorrent.stram.StreamingContainerManager.RecoveryHandler
    public void save(Object obj) throws IOException {
        if (this.fs.exists(this.snapshotBackupPath)) {
            throw new IllegalStateException("Found previous backup " + this.snapshotBackupPath);
        }
        if (this.fs.exists(this.snapshotPath)) {
            LOG.debug("Backup {} to {}", this.snapshotPath, this.snapshotBackupPath);
            this.fs.rename(this.snapshotPath, this.snapshotBackupPath);
        }
        LOG.debug("Writing checkpoint to {}", this.snapshotPath);
        FSDataOutputStream create = this.fs.create(this.snapshotPath);
        try {
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(create);
            try {
                objectOutputStream.writeObject(obj);
                objectOutputStream.close();
                if (this.fs.exists(this.snapshotBackupPath) && !this.fs.delete(this.snapshotBackupPath, false)) {
                    throw new IOException("Failed to remove " + this.snapshotBackupPath);
                }
                Path path = new Path(this.basedir + "/" + FILE_LOG_BACKUP);
                if (this.fs.exists(path) && !this.fs.delete(path, false)) {
                    throw new IOException("Failed to remove " + path);
                }
            } catch (Throwable th) {
                objectOutputStream.close();
                throw th;
            }
        } finally {
            create.close();
        }
    }

    @Override // com.datatorrent.stram.StreamingContainerManager.RecoveryHandler
    public Object restore() throws IOException {
        FileContext fileContext = FileContext.getFileContext(this.fs.getUri());
        if (fileContext.util().exists(this.snapshotBackupPath)) {
            LOG.warn("Incomplete checkpoint, reverting to {}", this.snapshotBackupPath);
            fileContext.rename(this.snapshotBackupPath, this.snapshotPath, new Options.Rename[]{Options.Rename.OVERWRITE});
            Path path = new Path(this.basedir, "log.combined");
            FSDataOutputStream create = fileContext.create(path, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), new Options.CreateOpts[0]);
            try {
                FSDataInputStream open = fileContext.open(this.logBackupPath);
                try {
                    IOUtils.copy(open, create);
                    open.close();
                    open = fileContext.open(this.logPath);
                    try {
                        IOUtils.copy(open, create);
                        open.close();
                        fileContext.rename(path, this.logPath, new Options.Rename[]{Options.Rename.OVERWRITE});
                        fileContext.delete(this.logBackupPath, false);
                    } finally {
                    }
                } finally {
                }
            } finally {
                create.close();
            }
        } else if (fileContext.util().exists(this.logBackupPath)) {
            LOG.warn("Found {}, did checkpointing fail?", this.logBackupPath);
            fileContext.rename(this.logBackupPath, this.logPath, new Options.Rename[]{Options.Rename.OVERWRITE});
        }
        if (!fileContext.util().exists(this.snapshotPath)) {
            LOG.debug("No existing checkpoint.");
            return null;
        }
        LOG.debug("Reading checkpoint {}", this.snapshotPath);
        FSDataInputStream open2 = fileContext.open(this.snapshotPath);
        final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        ObjectInputStream objectInputStream = new ObjectInputStream(open2) { // from class: com.datatorrent.stram.FSRecoveryHandler.2
            @Override // java.io.ObjectInputStream
            protected Class<?> resolveClass(ObjectStreamClass objectStreamClass) throws IOException, ClassNotFoundException {
                return Class.forName(objectStreamClass.getName(), true, contextClassLoader);
            }
        };
        try {
            try {
                Object readObject = objectInputStream.readObject();
                objectInputStream.close();
                return readObject;
            } catch (ClassNotFoundException e) {
                throw new IOException("Failed to read checkpointed state", e);
            }
        } catch (Throwable th) {
            objectInputStream.close();
            throw th;
        }
    }

    public void writeConnectUri(String str) throws IOException {
        FSDataOutputStream create = this.fs.create(this.heartbeatPath, true);
        try {
            create.write(str.getBytes());
            create.close();
            LOG.debug("Connect address: {} written to {} ", str, this.heartbeatPath);
        } catch (Throwable th) {
            create.close();
            throw th;
        }
    }

    public String readConnectUri() throws IOException {
        FSDataInputStream open = this.fs.open(this.heartbeatPath);
        try {
            byte[] byteArray = IOUtils.toByteArray(open);
            open.close();
            String str = new String(byteArray);
            LOG.debug("Connect address: {} from {} ", str, this.heartbeatPath);
            return str;
        } catch (Throwable th) {
            open.close();
            throw th;
        }
    }

    protected void finalize() throws Throwable {
        try {
            this.fs.close();
            super.finalize();
        } catch (Throwable th) {
            super.finalize();
            throw th;
        }
    }
}
