/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.contrib.streaming.state.OptionsFactory;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.contrib.streaming.state.RocksDBFoldingState;
import org.apache.flink.contrib.streaming.state.RocksDBListState;
import org.apache.flink.contrib.streaming.state.RocksDBReducingState;
import org.apache.flink.contrib.streaming.state.RocksDBValueState;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.streaming.util.HDFSCopyFromLocal;
import org.apache.flink.streaming.util.HDFSCopyToLocal;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.rocksdb.BackupEngine;
import org.rocksdb.BackupableDBOptions;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.Env;
import org.rocksdb.ReadOptions;
import org.rocksdb.RestoreOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBStateBackend
extends AbstractStateBackend {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class);
    private final Path checkpointDirectory;
    private final AbstractStateBackend nonPartitionedStateBackend;
    private boolean fullyAsyncBackup = false;
    private String operatorIdentifier;
    private JobID jobId;
    private Path[] configuredDbBasePaths;
    private File[] initializedDbBasePaths;
    private int nextDirectory;
    private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT_ROCKS_4_5_1;
    private OptionsFactory optionsFactory;
    private transient DBOptions dbOptions;
    private transient ColumnFamilyOptions columnOptions;
    private transient File instanceBasePath;
    private transient File instanceRocksDBPath;
    private transient String instanceCheckpointPath;
    protected volatile transient RocksDB db;
    private final SerializableObject dbCleanupLock = new SerializableObject();
    private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> kvStateInformation;

    public RocksDBStateBackend(String checkpointDataUri) throws IOException {
        this(new Path(checkpointDataUri).toUri());
    }

    public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
        FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri);
        this.nonPartitionedStateBackend = fsStateBackend;
        this.checkpointDirectory = fsStateBackend.getBasePath();
    }

    public RocksDBStateBackend(String checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException {
        this(new Path(checkpointDataUri).toUri(), nonPartitionedStateBackend);
    }

    public RocksDBStateBackend(URI checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException {
        this.nonPartitionedStateBackend = Objects.requireNonNull(nonPartitionedStateBackend);
        this.checkpointDirectory = FsStateBackend.validateAndNormalizeUri((URI)checkpointDataUri);
    }

    public void initializeForJob(Environment env, String operatorIdentifier, TypeSerializer<?> keySerializer) throws Exception {
        super.initializeForJob(env, operatorIdentifier, keySerializer);
        this.nonPartitionedStateBackend.initializeForJob(env, operatorIdentifier, keySerializer);
        this.operatorIdentifier = operatorIdentifier.replace(" ", "");
        this.jobId = env.getJobID();
        if (this.configuredDbBasePaths == null) {
            this.initializedDbBasePaths = env.getIOManager().getSpillingDirectories();
        } else {
            ArrayList<File> dirs = new ArrayList<File>(this.configuredDbBasePaths.length);
            String errorMessage = "";
            for (Path path : this.configuredDbBasePaths) {
                File f = new File(path.toUri().getPath());
                File testDir = new File(f, UUID.randomUUID().toString());
                if (!testDir.mkdirs()) {
                    String msg = "Local DB files directory '" + path + "' does not exist and cannot be created. ";
                    LOG.error(msg);
                    errorMessage = errorMessage + msg;
                } else {
                    dirs.add(f);
                }
                testDir.delete();
            }
            if (dirs.isEmpty()) {
                throw new Exception("No local storage directories available. " + errorMessage);
            }
            this.initializedDbBasePaths = dirs.toArray(new File[dirs.size()]);
        }
        this.nextDirectory = new Random().nextInt(this.initializedDbBasePaths.length);
        this.instanceBasePath = new File(this.getNextStoragePath(), "job-" + this.jobId.toString() + "_op-" + operatorIdentifier + "-" + env.getTaskInfo().getIndexOfThisSubtask());
        this.instanceCheckpointPath = this.getCheckpointPath("dummy_state");
        this.instanceRocksDBPath = new File(this.instanceBasePath, "db");
        RocksDB.loadLibrary();
        if (!this.instanceBasePath.exists() && !this.instanceBasePath.mkdirs()) {
            throw new RuntimeException("Could not create RocksDB data directory.");
        }
        try {
            if (this.instanceRocksDBPath.exists()) {
                LOG.warn("Deleting already existing db directory {}.", (Object)this.instanceRocksDBPath);
                FileUtils.deleteDirectory((File)this.instanceRocksDBPath);
            }
        }
        catch (IOException e) {
            throw new RuntimeException("Error cleaning RocksDB data directory.", e);
        }
        ArrayList<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<ColumnFamilyDescriptor>(1);
        columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
        ArrayList columnFamilyHandles = new ArrayList(1);
        try {
            this.db = RocksDB.open((DBOptions)this.getDbOptions(), (String)this.instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);
        }
        catch (RocksDBException e) {
            throw new RuntimeException("Error while opening RocksDB instance.", e);
        }
        this.kvStateInformation = new HashMap<String, Tuple2<ColumnFamilyHandle, StateDescriptor>>();
    }

    public void disposeAllStateForCurrentJob() throws Exception {
        this.nonPartitionedStateBackend.disposeAllStateForCurrentJob();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dispose() throws Exception {
        Throwable exception = null;
        try {
            super.dispose();
        }
        catch (Throwable t) {
            exception = t;
        }
        try {
            this.nonPartitionedStateBackend.dispose();
        }
        catch (Throwable t) {
            if (exception == null) {
                exception = t;
            }
            exception.addSuppressed(t);
        }
        try {
            SerializableObject t = this.dbCleanupLock;
            synchronized (t) {
                if (this.db != null) {
                    if (this.dbOptions != null) {
                        this.dbOptions.dispose();
                        this.dbOptions = null;
                    }
                    for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : this.kvStateInformation.values()) {
                        ((ColumnFamilyHandle)column.f0).dispose();
                    }
                    this.db.dispose();
                    this.db = null;
                }
            }
        }
        catch (Throwable t) {
            if (exception == null) {
                exception = t;
            }
            exception.addSuppressed(t);
        }
        try {
            org.apache.flink.util.FileUtils.deleteDirectory((File)this.instanceBasePath);
        }
        catch (Throwable t) {
            if (exception == null) {
                exception = t;
            }
            exception.addSuppressed(t);
        }
        if (exception != null) {
            if (exception instanceof Exception) {
                throw (Exception)exception;
            }
            if (exception instanceof Error) {
                throw (Error)exception;
            }
            throw new Exception(exception.getMessage(), exception);
        }
    }

    public void close() throws IOException {
        this.nonPartitionedStateBackend.close();
    }

    private String getCheckpointPath(String stateName) {
        return this.checkpointDirectory + "/" + this.jobId.toString() + "/" + this.operatorIdentifier + "/" + stateName;
    }

    private File getNextStoragePath() {
        int ni = this.nextDirectory + 1;
        this.nextDirectory = ni = ni >= this.initializedDbBasePaths.length ? 0 : ni;
        return this.initializedDbBasePaths[ni];
    }

    public File[] getStoragePaths() {
        return this.initializedDbBasePaths;
    }

    public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {
        if (this.keyValueStatesByName == null || this.keyValueStatesByName.size() == 0) {
            return null;
        }
        if (this.fullyAsyncBackup) {
            return this.performFullyAsyncSnapshot(checkpointId, timestamp);
        }
        return this.performSemiAsyncSnapshot(checkpointId, timestamp);
    }

    private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performSemiAsyncSnapshot(long checkpointId, long timestamp) throws Exception {
        File localBackupPath = new File(this.instanceBasePath, "local-chk-" + checkpointId);
        URI backupUri = new URI(this.instanceCheckpointPath + "/chk-" + checkpointId);
        if (!localBackupPath.exists() && !localBackupPath.mkdirs()) {
            throw new RuntimeException("Could not create local backup path " + localBackupPath);
        }
        long startTime = System.currentTimeMillis();
        BackupableDBOptions backupOptions = new BackupableDBOptions(localBackupPath.getAbsolutePath());
        backupOptions.setBackupLogFiles(false);
        backupOptions.setSync(false);
        try (BackupEngine backupEngine = BackupEngine.open((Env)Env.getDefault(), (BackupableDBOptions)backupOptions);){
            backupEngine.createNewBackup(this.db, true);
        }
        long endTime = System.currentTimeMillis();
        LOG.info("RocksDB (" + this.instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");
        ArrayList<Object> kvStateInformationCopy = new ArrayList<Object>();
        for (Tuple2<ColumnFamilyHandle, StateDescriptor> state : this.kvStateInformation.values()) {
            kvStateInformationCopy.add(state.f1);
        }
        SemiAsyncSnapshot dummySnapshot = new SemiAsyncSnapshot(localBackupPath, backupUri, kvStateInformationCopy, checkpointId);
        HashMap result = new HashMap();
        result.put("dummy_state", (KvStateSnapshot<?, ?, ?, ?, ?>)dummySnapshot);
        return result;
    }

    private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performFullyAsyncSnapshot(long checkpointId, long timestamp) throws Exception {
        URI backupUri = new URI(this.instanceCheckpointPath + "/chk-" + checkpointId);
        long startTime = System.currentTimeMillis();
        Snapshot snapshot = this.db.getSnapshot();
        long endTime = System.currentTimeMillis();
        LOG.info("Fully asynchronous RocksDB (" + this.instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");
        HashMap<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamiliesCopy = new HashMap<String, Tuple2<ColumnFamilyHandle, StateDescriptor>>();
        columnFamiliesCopy.putAll(this.kvStateInformation);
        FullyAsyncSnapshot dummySnapshot = new FullyAsyncSnapshot(snapshot, this, backupUri, columnFamiliesCopy, checkpointId);
        HashMap result = new HashMap();
        result.put("dummy_state", (KvStateSnapshot<?, ?, ?, ?, ?>)dummySnapshot);
        return result;
    }

    public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception {
        if (keyValueStateSnapshots == null) {
            return;
        }
        KvStateSnapshot dummyState = keyValueStateSnapshots.get("dummy_state");
        if (dummyState instanceof FinalSemiAsyncSnapshot) {
            this.restoreFromSemiAsyncSnapshot((FinalSemiAsyncSnapshot)dummyState);
        } else if (dummyState instanceof FinalFullyAsyncSnapshot) {
            this.restoreFromFullyAsyncSnapshot((FinalFullyAsyncSnapshot)dummyState);
        } else {
            throw new RuntimeException("Unknown RocksDB snapshot: " + dummyState);
        }
    }

    private void restoreFromSemiAsyncSnapshot(FinalSemiAsyncSnapshot snapshot) throws Exception {
        if (!this.instanceBasePath.exists() && !this.instanceBasePath.mkdirs()) {
            throw new RuntimeException("Could not create RocksDB data directory.");
        }
        this.db.dispose();
        try {
            if (this.instanceRocksDBPath.exists()) {
                LOG.warn("Deleting already existing db directory {}.", (Object)this.instanceRocksDBPath);
                FileUtils.deleteDirectory((File)this.instanceRocksDBPath);
            }
        }
        catch (IOException e) {
            throw new RuntimeException("Error cleaning RocksDB data directory.", e);
        }
        File localBackupPath = new File(this.instanceBasePath, "chk-" + snapshot.checkpointId);
        if (localBackupPath.exists()) {
            try {
                LOG.warn("Deleting already existing local backup directory {}.", (Object)localBackupPath);
                FileUtils.deleteDirectory((File)localBackupPath);
            }
            catch (IOException e) {
                throw new RuntimeException("Error cleaning RocksDB local backup directory.", e);
            }
        }
        HDFSCopyToLocal.copyToLocal((URI)snapshot.backupUri, (File)this.instanceBasePath);
        try {
            BackupEngine backupEngine = BackupEngine.open((Env)Env.getDefault(), (BackupableDBOptions)new BackupableDBOptions(localBackupPath.getAbsolutePath()));
            Object object = null;
            try {
                backupEngine.restoreDbFromLatestBackup(this.instanceRocksDBPath.getAbsolutePath(), this.instanceRocksDBPath.getAbsolutePath(), new RestoreOptions(true));
            }
            catch (Throwable throwable) {
                object = throwable;
                throw throwable;
            }
            finally {
                if (backupEngine != null) {
                    if (object != null) {
                        try {
                            backupEngine.close();
                        }
                        catch (Throwable throwable) {
                            ((Throwable)object).addSuppressed(throwable);
                        }
                    } else {
                        backupEngine.close();
                    }
                }
            }
        }
        catch (IllegalArgumentException | RocksDBException e) {
            throw new RuntimeException("Error while restoring RocksDB state from " + localBackupPath, e);
        }
        finally {
            try {
                FileUtils.deleteDirectory((File)localBackupPath);
            }
            catch (IOException e) {
                LOG.error("Error cleaning up local restore directory " + localBackupPath, (Throwable)e);
            }
        }
        ArrayList<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<ColumnFamilyDescriptor>(snapshot.stateDescriptors.size());
        for (StateDescriptor stateDescriptor : snapshot.stateDescriptors) {
            columnFamilyDescriptors.add(new ColumnFamilyDescriptor(stateDescriptor.getName().getBytes(), this.getColumnOptions()));
        }
        columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
        ArrayList columnFamilyHandles = new ArrayList(snapshot.stateDescriptors.size());
        try {
            this.db = RocksDB.open((DBOptions)this.getDbOptions(), (String)this.instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);
            this.kvStateInformation = new HashMap<String, Tuple2<ColumnFamilyHandle, StateDescriptor>>();
            for (int i = 0; i < snapshot.stateDescriptors.size(); ++i) {
                this.kvStateInformation.put(((StateDescriptor)snapshot.stateDescriptors.get(i)).getName(), (Tuple2<ColumnFamilyHandle, StateDescriptor>)new Tuple2(columnFamilyHandles.get(i), snapshot.stateDescriptors.get(i)));
            }
        }
        catch (RocksDBException e) {
            throw new RuntimeException("Error while opening RocksDB instance.", e);
        }
    }

    private void restoreFromFullyAsyncSnapshot(FinalFullyAsyncSnapshot snapshot) throws Exception {
        DataInputView inputView = (DataInputView)snapshot.stateHandle.getState(this.userCodeClassLoader);
        this.kvStateInformation.clear();
        int numColumns = inputView.readInt();
        HashMap<Byte, StateDescriptor> columnFamilyMapping = new HashMap<Byte, StateDescriptor>(numColumns);
        for (int i = 0; i < numColumns; ++i) {
            byte mappingByte = inputView.readByte();
            InstantiationUtil.ClassLoaderObjectInputStream ooIn = new InstantiationUtil.ClassLoaderObjectInputStream((InputStream)new DataInputViewStream(inputView), this.userCodeClassLoader);
            StateDescriptor stateDescriptor = (StateDescriptor)ooIn.readObject();
            columnFamilyMapping.put(mappingByte, stateDescriptor);
            this.getColumnFamily(stateDescriptor);
        }
        try {
            while (true) {
                byte mappingByte = inputView.readByte();
                ColumnFamilyHandle handle = this.getColumnFamily((StateDescriptor)columnFamilyMapping.get(mappingByte));
                byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
                byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
                this.db.put(handle, key, value);
            }
        }
        catch (EOFException eOFException) {
            return;
        }
    }

    protected ColumnFamilyHandle getColumnFamily(StateDescriptor descriptor) {
        Tuple2<ColumnFamilyHandle, StateDescriptor> stateInfo = this.kvStateInformation.get(descriptor.getName());
        if (stateInfo != null) {
            if (!((StateDescriptor)stateInfo.f1).equals((Object)descriptor)) {
                throw new RuntimeException("Trying to access state using wrong StateDescriptor, was " + stateInfo.f1 + " trying access with " + descriptor);
            }
            return (ColumnFamilyHandle)stateInfo.f0;
        }
        ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(descriptor.getName().getBytes(), this.getColumnOptions());
        try {
            ColumnFamilyHandle columnFamily = this.db.createColumnFamily(columnDescriptor);
            this.kvStateInformation.put(descriptor.getName(), (Tuple2<ColumnFamilyHandle, StateDescriptor>)new Tuple2((Object)columnFamily, (Object)descriptor));
            return columnFamily;
        }
        catch (RocksDBException e) {
            throw new RuntimeException("Error creating ColumnFamilyHandle.", e);
        }
    }

    public Object currentKey() {
        return this.currentKey;
    }

    public TypeSerializer keySerializer() {
        return this.keySerializer;
    }

    protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception {
        ColumnFamilyHandle columnFamily = this.getColumnFamily((StateDescriptor)stateDesc);
        return new RocksDBValueState(columnFamily, namespaceSerializer, stateDesc, this);
    }

    protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
        ColumnFamilyHandle columnFamily = this.getColumnFamily((StateDescriptor)stateDesc);
        return new RocksDBListState(columnFamily, namespaceSerializer, stateDesc, this);
    }

    protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
        ColumnFamilyHandle columnFamily = this.getColumnFamily((StateDescriptor)stateDesc);
        return new RocksDBReducingState(columnFamily, namespaceSerializer, stateDesc, this);
    }

    protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
        ColumnFamilyHandle columnFamily = this.getColumnFamily((StateDescriptor)stateDesc);
        return new RocksDBFoldingState(columnFamily, namespaceSerializer, stateDesc, this);
    }

    public AbstractStateBackend.CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
        return this.nonPartitionedStateBackend.createCheckpointStateOutputStream(checkpointID, timestamp);
    }

    public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S state, long checkpointID, long timestamp) throws Exception {
        return this.nonPartitionedStateBackend.checkpointStateSerializable(state, checkpointID, timestamp);
    }

    public void enableFullyAsyncSnapshots() {
        this.fullyAsyncBackup = true;
    }

    public void disableFullyAsyncSnapshots() {
        this.fullyAsyncBackup = false;
    }

    public void setDbStoragePath(String path) {
        String[] stringArray;
        if (path == null) {
            stringArray = null;
        } else {
            String[] stringArray2 = new String[1];
            stringArray = stringArray2;
            stringArray2[0] = path;
        }
        this.setDbStoragePaths(stringArray);
    }

    public void setDbStoragePaths(String ... paths) {
        if (paths == null) {
            this.configuredDbBasePaths = null;
        } else {
            if (paths.length == 0) {
                throw new IllegalArgumentException("empty paths");
            }
            Path[] pp = new Path[paths.length];
            for (int i = 0; i < paths.length; ++i) {
                if (paths[i] == null) {
                    throw new IllegalArgumentException("null path");
                }
                pp[i] = new Path(paths[i]);
                String scheme = pp[i].toUri().getScheme();
                if (scheme == null || scheme.equalsIgnoreCase("file")) continue;
                throw new IllegalArgumentException("Path " + paths[i] + " has a non local scheme");
            }
            this.configuredDbBasePaths = pp;
        }
    }

    public String[] getDbStoragePaths() {
        if (this.configuredDbBasePaths == null) {
            return null;
        }
        String[] paths = new String[this.configuredDbBasePaths.length];
        for (int i = 0; i < paths.length; ++i) {
            paths[i] = this.configuredDbBasePaths[i].toString();
        }
        return paths;
    }

    public void setPredefinedOptions(PredefinedOptions options) {
        this.predefinedOptions = Objects.requireNonNull(options);
    }

    public PredefinedOptions getPredefinedOptions() {
        return this.predefinedOptions;
    }

    public void setOptions(OptionsFactory optionsFactory) {
        this.optionsFactory = optionsFactory;
    }

    public OptionsFactory getOptions() {
        return this.optionsFactory;
    }

    public DBOptions getDbOptions() {
        if (this.dbOptions == null) {
            DBOptions opt = this.predefinedOptions.createDBOptions();
            if (this.optionsFactory != null) {
                opt = this.optionsFactory.createDBOptions(opt);
            }
            this.dbOptions = opt = opt.setCreateIfMissing(true);
        }
        return this.dbOptions;
    }

    public ColumnFamilyOptions getColumnOptions() {
        if (this.columnOptions == null) {
            ColumnFamilyOptions opt = this.predefinedOptions.createColumnOptions();
            if (this.optionsFactory != null) {
                opt = this.optionsFactory.createColumnOptions(opt);
            }
            this.columnOptions = opt;
        }
        return this.columnOptions;
    }

    private static class FinalFullyAsyncSnapshot
    implements KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
        private static final long serialVersionUID = 1L;
        final StateHandle<DataInputView> stateHandle;
        final long checkpointId;

        private FinalFullyAsyncSnapshot(StateHandle<DataInputView> stateHandle, long checkpointId) {
            this.stateHandle = Objects.requireNonNull(stateHandle);
            this.checkpointId = checkpointId;
        }

        public final KvState<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> restoreState(RocksDBStateBackend stateBackend, TypeSerializer<Object> keySerializer, ClassLoader classLoader) throws Exception {
            throw new RuntimeException("Should never happen.");
        }

        public final void discardState() throws Exception {
            this.stateHandle.discardState();
        }

        public final long getStateSize() throws Exception {
            return this.stateHandle.getStateSize();
        }

        public void close() throws IOException {
            this.stateHandle.close();
        }
    }

    private class FullyAsyncSnapshot
    extends AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
        private static final long serialVersionUID = 1L;
        private transient Snapshot snapshot;
        private transient AbstractStateBackend backend;
        private final SerializableObject lock = new SerializableObject();
        private final URI backupUri;
        private final Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamilies;
        private final long checkpointId;
        private volatile boolean discarded;

        private FullyAsyncSnapshot(Snapshot snapshot, AbstractStateBackend backend, URI backupUri, Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamilies, long checkpointId) {
            this.snapshot = snapshot;
            this.backend = backend;
            this.backupUri = backupUri;
            this.columnFamilies = columnFamilies;
            this.checkpointId = checkpointId;
            this.discarded = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
            SerializableObject serializableObject = this.lock;
            synchronized (serializableObject) {
                StateHandle stateHandle;
                AbstractStateBackend.CheckpointStateOutputView outputView;
                if (this.discarded) {
                    throw new Exception("FullyAsyncSnapshot has already been discarded.");
                }
                long startTime = System.currentTimeMillis();
                try {
                    try {
                        outputView = this.backend.createCheckpointStateOutputView(this.checkpointId, startTime);
                    }
                    catch (Exception e) {
                        throw new Exception("Could not create a checkpoint state output view to materialize the checkpoint data into.", e);
                    }
                    try {
                        outputView.writeInt(this.columnFamilies.size());
                        byte count = 0;
                        HashMap<String, Byte> columnFamilyMapping = new HashMap<String, Byte>();
                        for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : this.columnFamilies.entrySet()) {
                            columnFamilyMapping.put(column.getKey(), count);
                            outputView.writeByte((int)count);
                            ObjectOutputStream ooOut = new ObjectOutputStream((OutputStream)outputView);
                            ooOut.writeObject(column.getValue().f1);
                            ooOut.flush();
                            count = (byte)(count + 1);
                        }
                        ReadOptions readOptions = new ReadOptions();
                        readOptions.setSnapshot(this.snapshot);
                        for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : this.columnFamilies.entrySet()) {
                            byte columnByte = (Byte)columnFamilyMapping.get(column.getKey());
                            SerializableObject serializableObject2 = RocksDBStateBackend.this.dbCleanupLock;
                            synchronized (serializableObject2) {
                                if (RocksDBStateBackend.this.db == null) {
                                    throw new RuntimeException("RocksDB instance was disposed. This happens when we are in the middle of a checkpoint and the job fails.");
                                }
                                RocksIterator iterator = RocksDBStateBackend.this.db.newIterator((ColumnFamilyHandle)column.getValue().f0, readOptions);
                                iterator.seekToFirst();
                                while (iterator.isValid()) {
                                    outputView.writeByte((int)columnByte);
                                    BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(), (DataOutputView)outputView);
                                    BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(), (DataOutputView)outputView);
                                    iterator.next();
                                }
                            }
                        }
                    }
                    catch (Exception e) {
                        try {
                            outputView.close();
                            throw new Exception("Could not write the checkpoint data into the checkpoint state output view.", e);
                        }
                        catch (Exception closingException) {
                            LOG.warn("Could not close the checkpoint state output view. The written data might not be deleted.", (Throwable)closingException);
                        }
                        throw new Exception("Could not write the checkpoint data into the checkpoint state output view.", e);
                    }
                }
                finally {
                    this.discardState();
                }
                try {
                    stateHandle = outputView.closeAndGetHandle();
                }
                catch (Exception ioE) {
                    throw new Exception("Could not close the checkpoint state output view and obtain the state handle.", ioE);
                }
                long endTime = System.currentTimeMillis();
                LOG.info("Fully asynchronous RocksDB materialization to {} (asynchronous part) took {} ms.", (Object)this.backupUri, (Object)(endTime - startTime));
                return new FinalFullyAsyncSnapshot(stateHandle, this.checkpointId);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void discardState() throws Exception {
            if (!this.discarded) {
                Snapshot snapshotToRelease = this.snapshot;
                SerializableObject serializableObject = this.lock;
                synchronized (serializableObject) {
                    if (this.discarded) {
                        return;
                    }
                    this.discarded = true;
                    this.snapshot = null;
                }
                serializableObject = RocksDBStateBackend.this.dbCleanupLock;
                synchronized (serializableObject) {
                    if (RocksDBStateBackend.this.db != null) {
                        RocksDBStateBackend.this.db.releaseSnapshot(snapshotToRelease);
                    }
                }
            }
        }
    }

    private static class FinalSemiAsyncSnapshot
    implements KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
        private static final long serialVersionUID = 1L;
        final URI backupUri;
        final long checkpointId;
        private final List<StateDescriptor> stateDescriptors;

        private FinalSemiAsyncSnapshot(URI backupUri, long checkpointId, List<StateDescriptor> stateDescriptors) {
            this.backupUri = backupUri;
            this.checkpointId = checkpointId;
            this.stateDescriptors = stateDescriptors;
        }

        public final KvState<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> restoreState(RocksDBStateBackend stateBackend, TypeSerializer<Object> keySerializer, ClassLoader classLoader) throws Exception {
            throw new RuntimeException("Should never happen.");
        }

        public final void discardState() throws Exception {
            FileSystem fs = FileSystem.get((URI)this.backupUri, (Configuration)HadoopFileSystem.getHadoopConfiguration());
            fs.delete(new org.apache.hadoop.fs.Path(this.backupUri), true);
        }

        public final long getStateSize() throws Exception {
            FileSystem fs = FileSystem.get((URI)this.backupUri, (Configuration)HadoopFileSystem.getHadoopConfiguration());
            return fs.getContentSummary(new org.apache.hadoop.fs.Path(this.backupUri)).getLength();
        }

        public void close() throws IOException {
        }
    }

    private static class SemiAsyncSnapshot
    extends AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
        private static final long serialVersionUID = 1L;
        private final SerializableObject lock = new SerializableObject();
        private final File localBackupPath;
        private final URI backupUri;
        private final List<StateDescriptor> stateDescriptors;
        private final long checkpointId;
        private volatile boolean discarded;

        private SemiAsyncSnapshot(File localBackupPath, URI backupUri, List<StateDescriptor> columnFamilies, long checkpointId) {
            this.localBackupPath = localBackupPath;
            this.backupUri = backupUri;
            this.stateDescriptors = columnFamilies;
            this.checkpointId = checkpointId;
            this.discarded = false;
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
            SerializableObject serializableObject = this.lock;
            synchronized (serializableObject) {
                if (this.discarded) {
                    throw new Exception("The SemiAsyncSnapshot has already been discarded.");
                }
                try {
                    long startTime = System.currentTimeMillis();
                    HDFSCopyFromLocal.copyFromLocal((File)this.localBackupPath, (URI)this.backupUri);
                    long endTime = System.currentTimeMillis();
                    LOG.info("RocksDB materialization from {} to {} (asynchronous part) took {} ms.", new Object[]{this.localBackupPath, this.backupUri, endTime - startTime});
                    FinalSemiAsyncSnapshot finalSemiAsyncSnapshot = new FinalSemiAsyncSnapshot(this.backupUri, this.checkpointId, this.stateDescriptors);
                    return finalSemiAsyncSnapshot;
                }
                catch (Exception e) {
                    FileSystem fs = FileSystem.get((URI)this.backupUri, (Configuration)HadoopFileSystem.getHadoopConfiguration());
                    fs.delete(new org.apache.hadoop.fs.Path(this.backupUri), true);
                    throw e;
                }
                finally {
                    this.discardState();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void discardState() throws Exception {
            if (!this.discarded) {
                SerializableObject serializableObject = this.lock;
                synchronized (serializableObject) {
                    if (!this.discarded) {
                        this.discarded = true;
                        if (!FileUtils.deleteQuietly((File)this.localBackupPath)) {
                            LOG.warn("Could not delete the local backup file stored at {}.", (Object)this.localBackupPath);
                        }
                    }
                }
            }
        }
    }
}

