package org.apache.flink.contrib.streaming.state;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.streaming.util.HDFSCopyFromLocal;
import org.apache.flink.streaming.util.HDFSCopyToLocal;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.fs.FileSystem;
import org.rocksdb.BackupEngine;
import org.rocksdb.BackupableDBOptions;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.Env;
import org.rocksdb.ReadOptions;
import org.rocksdb.RestoreOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBStateBackend.class */
public class RocksDBStateBackend extends AbstractStateBackend {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class);
    private final Path checkpointDirectory;
    private final AbstractStateBackend nonPartitionedStateBackend;
    private boolean fullyAsyncBackup;
    private String operatorIdentifier;
    private JobID jobId;
    private Path[] configuredDbBasePaths;
    private File[] initializedDbBasePaths;
    private int nextDirectory;
    private PredefinedOptions predefinedOptions;
    private OptionsFactory optionsFactory;
    private transient DBOptions dbOptions;
    private transient ColumnFamilyOptions columnOptions;
    private transient File instanceBasePath;
    private transient File instanceRocksDBPath;
    private transient String instanceCheckpointPath;
    protected volatile transient RocksDB db;
    private final SerializableObject dbCleanupLock;
    private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> kvStateInformation;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBStateBackend$FinalFullyAsyncSnapshot.class */
    public static class FinalFullyAsyncSnapshot implements KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
        private static final long serialVersionUID = 1;
        final StateHandle<DataInputView> stateHandle;
        final long checkpointId;

        private FinalFullyAsyncSnapshot(StateHandle<DataInputView> stateHandle, long j) {
            this.stateHandle = (StateHandle) Objects.requireNonNull(stateHandle);
            this.checkpointId = j;
        }

        public final KvState<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> restoreState(RocksDBStateBackend rocksDBStateBackend, TypeSerializer<Object> typeSerializer, ClassLoader classLoader) throws Exception {
            throw new RuntimeException("Should never happen.");
        }

        public final void discardState() throws Exception {
            this.stateHandle.discardState();
        }

        public final long getStateSize() throws Exception {
            return this.stateHandle.getStateSize();
        }

        public void close() throws IOException {
            this.stateHandle.close();
        }

        public /* bridge */ /* synthetic */ KvState restoreState(AbstractStateBackend abstractStateBackend, TypeSerializer typeSerializer, ClassLoader classLoader) throws Exception {
            return restoreState((RocksDBStateBackend) abstractStateBackend, (TypeSerializer<Object>) typeSerializer, classLoader);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBStateBackend$FinalSemiAsyncSnapshot.class */
    public static class FinalSemiAsyncSnapshot implements KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
        private static final long serialVersionUID = 1;
        final URI backupUri;
        final long checkpointId;
        private final List<StateDescriptor> stateDescriptors;

        private FinalSemiAsyncSnapshot(URI uri, long j, List<StateDescriptor> list) {
            this.backupUri = uri;
            this.checkpointId = j;
            this.stateDescriptors = list;
        }

        public final KvState<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> restoreState(RocksDBStateBackend rocksDBStateBackend, TypeSerializer<Object> typeSerializer, ClassLoader classLoader) throws Exception {
            throw new RuntimeException("Should never happen.");
        }

        public final void discardState() throws Exception {
            FileSystem.get(this.backupUri, HadoopFileSystem.getHadoopConfiguration()).delete(new org.apache.hadoop.fs.Path(this.backupUri), true);
        }

        public final long getStateSize() throws Exception {
            return FileSystem.get(this.backupUri, HadoopFileSystem.getHadoopConfiguration()).getContentSummary(new org.apache.hadoop.fs.Path(this.backupUri)).getLength();
        }

        public void close() throws IOException {
        }

        public /* bridge */ /* synthetic */ KvState restoreState(AbstractStateBackend abstractStateBackend, TypeSerializer typeSerializer, ClassLoader classLoader) throws Exception {
            return restoreState((RocksDBStateBackend) abstractStateBackend, (TypeSerializer<Object>) typeSerializer, classLoader);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBStateBackend$FullyAsyncSnapshot.class */
    public class FullyAsyncSnapshot extends AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
        private static final long serialVersionUID = 1;
        private transient Snapshot snapshot;
        private transient AbstractStateBackend backend;
        private final SerializableObject lock;
        private final URI backupUri;
        private final Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamilies;
        private final long checkpointId;
        private volatile boolean discarded;

        private FullyAsyncSnapshot(Snapshot snapshot, AbstractStateBackend abstractStateBackend, URI uri, Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> map, long j) {
            this.lock = new SerializableObject();
            this.snapshot = snapshot;
            this.backend = abstractStateBackend;
            this.backupUri = uri;
            this.columnFamilies = map;
            this.checkpointId = j;
            this.discarded = false;
        }

        /* JADX WARN: Finally extract failed */
        public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
            FinalFullyAsyncSnapshot finalFullyAsyncSnapshot;
            synchronized (this.lock) {
                if (this.discarded) {
                    throw new Exception("FullyAsyncSnapshot has already been discarded.");
                }
                long currentTimeMillis = System.currentTimeMillis();
                try {
                    try {
                        AbstractStateBackend.CheckpointStateOutputView createCheckpointStateOutputView = this.backend.createCheckpointStateOutputView(this.checkpointId, currentTimeMillis);
                        try {
                            createCheckpointStateOutputView.writeInt(this.columnFamilies.size());
                            byte b = 0;
                            HashMap hashMap = new HashMap();
                            for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> entry : this.columnFamilies.entrySet()) {
                                hashMap.put(entry.getKey(), Byte.valueOf(b));
                                createCheckpointStateOutputView.writeByte(b);
                                ObjectOutputStream objectOutputStream = new ObjectOutputStream(createCheckpointStateOutputView);
                                objectOutputStream.writeObject(entry.getValue().f1);
                                objectOutputStream.flush();
                                b = (byte) (b + 1);
                            }
                            ReadOptions readOptions = new ReadOptions();
                            readOptions.setSnapshot(this.snapshot);
                            for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> entry2 : this.columnFamilies.entrySet()) {
                                byte byteValue = ((Byte) hashMap.get(entry2.getKey())).byteValue();
                                synchronized (RocksDBStateBackend.this.dbCleanupLock) {
                                    if (RocksDBStateBackend.this.db == null) {
                                        throw new RuntimeException("RocksDB instance was disposed. This happens when we are in the middle of a checkpoint and the job fails.");
                                    }
                                    RocksIterator newIterator = RocksDBStateBackend.this.db.newIterator((ColumnFamilyHandle) entry2.getValue().f0, readOptions);
                                    newIterator.seekToFirst();
                                    while (newIterator.isValid()) {
                                        createCheckpointStateOutputView.writeByte(byteValue);
                                        BytePrimitiveArraySerializer.INSTANCE.serialize(newIterator.key(), createCheckpointStateOutputView);
                                        BytePrimitiveArraySerializer.INSTANCE.serialize(newIterator.value(), createCheckpointStateOutputView);
                                        newIterator.next();
                                    }
                                }
                            }
                            discardState();
                            try {
                                StateHandle closeAndGetHandle = createCheckpointStateOutputView.closeAndGetHandle();
                                RocksDBStateBackend.LOG.info("Fully asynchronous RocksDB materialization to {} (asynchronous part) took {} ms.", this.backupUri, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                                finalFullyAsyncSnapshot = new FinalFullyAsyncSnapshot(closeAndGetHandle, this.checkpointId);
                            } catch (Exception e) {
                                throw new Exception("Could not close the checkpoint state output view and obtain the state handle.", e);
                            }
                        } catch (Exception e2) {
                            try {
                                createCheckpointStateOutputView.close();
                            } catch (Exception e3) {
                                RocksDBStateBackend.LOG.warn("Could not close the checkpoint state output view. The written data might not be deleted.", e3);
                            }
                            throw new Exception("Could not write the checkpoint data into the checkpoint state output view.", e2);
                        }
                    } catch (Throwable th) {
                        discardState();
                        throw th;
                    }
                } catch (Exception e4) {
                    throw new Exception("Could not create a checkpoint state output view to materialize the checkpoint data into.", e4);
                }
            }
            return finalFullyAsyncSnapshot;
        }

        public void discardState() throws Exception {
            if (this.discarded) {
                return;
            }
            Snapshot snapshot = this.snapshot;
            synchronized (this.lock) {
                if (this.discarded) {
                    return;
                }
                this.discarded = true;
                this.snapshot = null;
                synchronized (RocksDBStateBackend.this.dbCleanupLock) {
                    if (RocksDBStateBackend.this.db != null) {
                        RocksDBStateBackend.this.db.releaseSnapshot(snapshot);
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBStateBackend$SemiAsyncSnapshot.class */
    public static class SemiAsyncSnapshot extends AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
        private static final long serialVersionUID = 1;
        private final SerializableObject lock;
        private final File localBackupPath;
        private final URI backupUri;
        private final List<StateDescriptor> stateDescriptors;
        private final long checkpointId;
        private volatile boolean discarded;

        private SemiAsyncSnapshot(File file, URI uri, List<StateDescriptor> list, long j) {
            this.lock = new SerializableObject();
            this.localBackupPath = file;
            this.backupUri = uri;
            this.stateDescriptors = list;
            this.checkpointId = j;
            this.discarded = false;
        }

        public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
            FinalSemiAsyncSnapshot finalSemiAsyncSnapshot;
            synchronized (this.lock) {
                try {
                    if (this.discarded) {
                        throw new Exception("The SemiAsyncSnapshot has already been discarded.");
                    }
                    try {
                        long currentTimeMillis = System.currentTimeMillis();
                        HDFSCopyFromLocal.copyFromLocal(this.localBackupPath, this.backupUri);
                        RocksDBStateBackend.LOG.info("RocksDB materialization from {} to {} (asynchronous part) took {} ms.", new Object[]{this.localBackupPath, this.backupUri, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
                        finalSemiAsyncSnapshot = new FinalSemiAsyncSnapshot(this.backupUri, this.checkpointId, this.stateDescriptors);
                        discardState();
                    } catch (Exception e) {
                        FileSystem.get(this.backupUri, HadoopFileSystem.getHadoopConfiguration()).delete(new org.apache.hadoop.fs.Path(this.backupUri), true);
                        throw e;
                    }
                } catch (Throwable th) {
                    discardState();
                    throw th;
                }
            }
            return finalSemiAsyncSnapshot;
        }

        public void discardState() throws Exception {
            if (this.discarded) {
                return;
            }
            synchronized (this.lock) {
                if (!this.discarded) {
                    this.discarded = true;
                    if (!FileUtils.deleteQuietly(this.localBackupPath)) {
                        RocksDBStateBackend.LOG.warn("Could not delete the local backup file stored at {}.", this.localBackupPath);
                    }
                }
            }
        }
    }

    public RocksDBStateBackend(String str) throws IOException {
        this(new Path(str).toUri());
    }

    public RocksDBStateBackend(URI uri) throws IOException {
        this.fullyAsyncBackup = false;
        this.predefinedOptions = PredefinedOptions.DEFAULT_ROCKS_4_5_1;
        this.dbCleanupLock = new SerializableObject();
        FsStateBackend fsStateBackend = new FsStateBackend(uri);
        this.nonPartitionedStateBackend = fsStateBackend;
        this.checkpointDirectory = fsStateBackend.getBasePath();
    }

    public RocksDBStateBackend(String str, AbstractStateBackend abstractStateBackend) throws IOException {
        this(new Path(str).toUri(), abstractStateBackend);
    }

    public RocksDBStateBackend(URI uri, AbstractStateBackend abstractStateBackend) throws IOException {
        this.fullyAsyncBackup = false;
        this.predefinedOptions = PredefinedOptions.DEFAULT_ROCKS_4_5_1;
        this.dbCleanupLock = new SerializableObject();
        this.nonPartitionedStateBackend = (AbstractStateBackend) Objects.requireNonNull(abstractStateBackend);
        this.checkpointDirectory = FsStateBackend.validateAndNormalizeUri(uri);
    }

    public void initializeForJob(Environment environment, String str, TypeSerializer<?> typeSerializer) throws Exception {
        super.initializeForJob(environment, str, typeSerializer);
        this.nonPartitionedStateBackend.initializeForJob(environment, str, typeSerializer);
        this.operatorIdentifier = str.replace(" ", "");
        this.jobId = environment.getJobID();
        if (this.configuredDbBasePaths == null) {
            this.initializedDbBasePaths = environment.getIOManager().getSpillingDirectories();
        } else {
            ArrayList arrayList = new ArrayList(this.configuredDbBasePaths.length);
            String str2 = "";
            for (Path path : this.configuredDbBasePaths) {
                File file = new File(path.toUri().getPath());
                File file2 = new File(file, UUID.randomUUID().toString());
                if (file2.mkdirs()) {
                    arrayList.add(file);
                } else {
                    String str3 = "Local DB files directory '" + path + "' does not exist and cannot be created. ";
                    LOG.error(str3);
                    str2 = str2 + str3;
                }
                file2.delete();
            }
            if (arrayList.isEmpty()) {
                throw new Exception("No local storage directories available. " + str2);
            }
            this.initializedDbBasePaths = (File[]) arrayList.toArray(new File[arrayList.size()]);
        }
        this.nextDirectory = new Random().nextInt(this.initializedDbBasePaths.length);
        this.instanceBasePath = new File(getNextStoragePath(), "job-" + this.jobId.toString() + "_op-" + str + "-" + environment.getTaskInfo().getIndexOfThisSubtask());
        this.instanceCheckpointPath = getCheckpointPath("dummy_state");
        this.instanceRocksDBPath = new File(this.instanceBasePath, "db");
        RocksDB.loadLibrary();
        if (!this.instanceBasePath.exists() && !this.instanceBasePath.mkdirs()) {
            throw new RuntimeException("Could not create RocksDB data directory.");
        }
        try {
            if (this.instanceRocksDBPath.exists()) {
                LOG.warn("Deleting already existing db directory {}.", this.instanceRocksDBPath);
                FileUtils.deleteDirectory(this.instanceRocksDBPath);
            }
            ArrayList arrayList2 = new ArrayList(1);
            arrayList2.add(new ColumnFamilyDescriptor("default".getBytes()));
            try {
                this.db = RocksDB.open(getDbOptions(), this.instanceRocksDBPath.getAbsolutePath(), arrayList2, new ArrayList(1));
                this.kvStateInformation = new HashMap();
            } catch (RocksDBException e) {
                throw new RuntimeException("Error while opening RocksDB instance.", e);
            }
        } catch (IOException e2) {
            throw new RuntimeException("Error cleaning RocksDB data directory.", e2);
        }
    }

    public void disposeAllStateForCurrentJob() throws Exception {
        this.nonPartitionedStateBackend.disposeAllStateForCurrentJob();
    }

    public void dispose() throws Exception {
        Throwable th = null;
        try {
            super.dispose();
        } catch (Throwable th2) {
            th = th2;
        }
        try {
            this.nonPartitionedStateBackend.dispose();
        } catch (Throwable th3) {
            if (th == null) {
                th = th3;
            } else {
                th.addSuppressed(th3);
            }
        }
        try {
            synchronized (this.dbCleanupLock) {
                if (this.db != null) {
                    if (this.dbOptions != null) {
                        this.dbOptions.dispose();
                        this.dbOptions = null;
                    }
                    Iterator<Tuple2<ColumnFamilyHandle, StateDescriptor>> it = this.kvStateInformation.values().iterator();
                    while (it.hasNext()) {
                        ((ColumnFamilyHandle) it.next().f0).dispose();
                    }
                    this.db.dispose();
                    this.db = null;
                }
            }
        } catch (Throwable th4) {
            if (th == null) {
                th = th4;
            } else {
                th.addSuppressed(th4);
            }
        }
        try {
            org.apache.flink.util.FileUtils.deleteDirectory(this.instanceBasePath);
        } catch (Throwable th5) {
            if (th == null) {
                th = th5;
            } else {
                th.addSuppressed(th5);
            }
        }
        if (th != null) {
            if (th instanceof Exception) {
                throw ((Exception) th);
            }
            if (!(th instanceof Error)) {
                throw new Exception(th.getMessage(), th);
            }
            throw ((Error) th);
        }
    }

    public void close() throws IOException {
        this.nonPartitionedStateBackend.close();
    }

    private String getCheckpointPath(String str) {
        return this.checkpointDirectory + "/" + this.jobId.toString() + "/" + this.operatorIdentifier + "/" + str;
    }

    private File getNextStoragePath() {
        int i = this.nextDirectory + 1;
        int i2 = i >= this.initializedDbBasePaths.length ? 0 : i;
        this.nextDirectory = i2;
        return this.initializedDbBasePaths[i2];
    }

    public File[] getStoragePaths() {
        return this.initializedDbBasePaths;
    }

    public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long j, long j2) throws Exception {
        if (this.keyValueStatesByName == null || this.keyValueStatesByName.size() == 0) {
            return null;
        }
        return this.fullyAsyncBackup ? performFullyAsyncSnapshot(j, j2) : performSemiAsyncSnapshot(j, j2);
    }

    private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performSemiAsyncSnapshot(long j, long j2) throws Exception {
        File file = new File(this.instanceBasePath, "local-chk-" + j);
        URI uri = new URI(this.instanceCheckpointPath + "/chk-" + j);
        if (!file.exists() && !file.mkdirs()) {
            throw new RuntimeException("Could not create local backup path " + file);
        }
        long currentTimeMillis = System.currentTimeMillis();
        BackupableDBOptions backupableDBOptions = new BackupableDBOptions(file.getAbsolutePath());
        backupableDBOptions.setBackupLogFiles(false);
        backupableDBOptions.setSync(false);
        BackupEngine open = BackupEngine.open(Env.getDefault(), backupableDBOptions);
        Throwable th = null;
        try {
            try {
                open.createNewBackup(this.db, true);
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        open.close();
                    }
                }
                LOG.info("RocksDB (" + this.instanceRocksDBPath + ") backup (synchronous part) took " + (System.currentTimeMillis() - currentTimeMillis) + " ms.");
                ArrayList arrayList = new ArrayList();
                Iterator<Tuple2<ColumnFamilyHandle, StateDescriptor>> it = this.kvStateInformation.values().iterator();
                while (it.hasNext()) {
                    arrayList.add(it.next().f1);
                }
                KvStateSnapshot<?, ?, ?, ?, ?> semiAsyncSnapshot = new SemiAsyncSnapshot(file, uri, arrayList, j);
                HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> hashMap = new HashMap<>();
                hashMap.put("dummy_state", semiAsyncSnapshot);
                return hashMap;
            } finally {
            }
        } catch (Throwable th3) {
            if (open != null) {
                if (th != null) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }

    private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performFullyAsyncSnapshot(long j, long j2) throws Exception {
        URI uri = new URI(this.instanceCheckpointPath + "/chk-" + j);
        long currentTimeMillis = System.currentTimeMillis();
        Snapshot snapshot = this.db.getSnapshot();
        LOG.info("Fully asynchronous RocksDB (" + this.instanceRocksDBPath + ") backup (synchronous part) took " + (System.currentTimeMillis() - currentTimeMillis) + " ms.");
        HashMap hashMap = new HashMap();
        hashMap.putAll(this.kvStateInformation);
        KvStateSnapshot<?, ?, ?, ?, ?> fullyAsyncSnapshot = new FullyAsyncSnapshot(snapshot, this, uri, hashMap, j);
        HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> hashMap2 = new HashMap<>();
        hashMap2.put("dummy_state", fullyAsyncSnapshot);
        return hashMap2;
    }

    public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> hashMap) throws Exception {
        if (hashMap == null) {
            return;
        }
        KvStateSnapshot kvStateSnapshot = hashMap.get("dummy_state");
        if (kvStateSnapshot instanceof FinalSemiAsyncSnapshot) {
            restoreFromSemiAsyncSnapshot((FinalSemiAsyncSnapshot) kvStateSnapshot);
        } else {
            if (!(kvStateSnapshot instanceof FinalFullyAsyncSnapshot)) {
                throw new RuntimeException("Unknown RocksDB snapshot: " + kvStateSnapshot);
            }
            restoreFromFullyAsyncSnapshot((FinalFullyAsyncSnapshot) kvStateSnapshot);
        }
    }

    private void restoreFromSemiAsyncSnapshot(FinalSemiAsyncSnapshot finalSemiAsyncSnapshot) throws Exception {
        if (!this.instanceBasePath.exists() && !this.instanceBasePath.mkdirs()) {
            throw new RuntimeException("Could not create RocksDB data directory.");
        }
        this.db.dispose();
        try {
            if (this.instanceRocksDBPath.exists()) {
                LOG.warn("Deleting already existing db directory {}.", this.instanceRocksDBPath);
                FileUtils.deleteDirectory(this.instanceRocksDBPath);
            }
            File file = new File(this.instanceBasePath, "chk-" + finalSemiAsyncSnapshot.checkpointId);
            if (file.exists()) {
                try {
                    LOG.warn("Deleting already existing local backup directory {}.", file);
                    FileUtils.deleteDirectory(file);
                } catch (IOException e) {
                    throw new RuntimeException("Error cleaning RocksDB local backup directory.", e);
                }
            }
            HDFSCopyToLocal.copyToLocal(finalSemiAsyncSnapshot.backupUri, this.instanceBasePath);
            try {
                try {
                    BackupEngine open = BackupEngine.open(Env.getDefault(), new BackupableDBOptions(file.getAbsolutePath()));
                    Throwable th = null;
                    try {
                        try {
                            open.restoreDbFromLatestBackup(this.instanceRocksDBPath.getAbsolutePath(), this.instanceRocksDBPath.getAbsolutePath(), new RestoreOptions(true));
                            if (open != null) {
                                if (0 != 0) {
                                    try {
                                        open.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    open.close();
                                }
                            }
                            ArrayList arrayList = new ArrayList(finalSemiAsyncSnapshot.stateDescriptors.size());
                            Iterator it = finalSemiAsyncSnapshot.stateDescriptors.iterator();
                            while (it.hasNext()) {
                                arrayList.add(new ColumnFamilyDescriptor(((StateDescriptor) it.next()).getName().getBytes(), getColumnOptions()));
                            }
                            arrayList.add(new ColumnFamilyDescriptor("default".getBytes()));
                            ArrayList arrayList2 = new ArrayList(finalSemiAsyncSnapshot.stateDescriptors.size());
                            try {
                                this.db = RocksDB.open(getDbOptions(), this.instanceRocksDBPath.getAbsolutePath(), arrayList, arrayList2);
                                this.kvStateInformation = new HashMap();
                                for (int i = 0; i < finalSemiAsyncSnapshot.stateDescriptors.size(); i++) {
                                    this.kvStateInformation.put(((StateDescriptor) finalSemiAsyncSnapshot.stateDescriptors.get(i)).getName(), new Tuple2<>(arrayList2.get(i), finalSemiAsyncSnapshot.stateDescriptors.get(i)));
                                }
                            } catch (RocksDBException e2) {
                                throw new RuntimeException("Error while opening RocksDB instance.", e2);
                            }
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (open != null) {
                            if (th != null) {
                                try {
                                    open.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                open.close();
                            }
                        }
                        throw th3;
                    }
                } finally {
                    try {
                        FileUtils.deleteDirectory(file);
                    } catch (IOException e3) {
                        LOG.error("Error cleaning up local restore directory " + file, e3);
                    }
                }
            } catch (RocksDBException | IllegalArgumentException e4) {
                throw new RuntimeException("Error while restoring RocksDB state from " + file, e4);
            }
        } catch (IOException e5) {
            throw new RuntimeException("Error cleaning RocksDB data directory.", e5);
        }
    }

    private void restoreFromFullyAsyncSnapshot(FinalFullyAsyncSnapshot finalFullyAsyncSnapshot) throws Exception {
        DataInputView dataInputView = (DataInputView) finalFullyAsyncSnapshot.stateHandle.getState(this.userCodeClassLoader);
        this.kvStateInformation.clear();
        int readInt = dataInputView.readInt();
        HashMap hashMap = new HashMap(readInt);
        for (int i = 0; i < readInt; i++) {
            byte readByte = dataInputView.readByte();
            StateDescriptor stateDescriptor = (StateDescriptor) new InstantiationUtil.ClassLoaderObjectInputStream(new DataInputViewStream(dataInputView), this.userCodeClassLoader).readObject();
            hashMap.put(Byte.valueOf(readByte), stateDescriptor);
            getColumnFamily(stateDescriptor);
        }
        while (true) {
            try {
                this.db.put(getColumnFamily((StateDescriptor) hashMap.get(Byte.valueOf(dataInputView.readByte()))), BytePrimitiveArraySerializer.INSTANCE.deserialize(dataInputView), BytePrimitiveArraySerializer.INSTANCE.deserialize(dataInputView));
            } catch (EOFException e) {
                return;
            }
        }
    }

    protected ColumnFamilyHandle getColumnFamily(StateDescriptor stateDescriptor) {
        Tuple2<ColumnFamilyHandle, StateDescriptor> tuple2 = this.kvStateInformation.get(stateDescriptor.getName());
        if (tuple2 != null) {
            if (((StateDescriptor) tuple2.f1).equals(stateDescriptor)) {
                return (ColumnFamilyHandle) tuple2.f0;
            }
            throw new RuntimeException("Trying to access state using wrong StateDescriptor, was " + tuple2.f1 + " trying access with " + stateDescriptor);
        }
        try {
            ColumnFamilyHandle createColumnFamily = this.db.createColumnFamily(new ColumnFamilyDescriptor(stateDescriptor.getName().getBytes(), getColumnOptions()));
            this.kvStateInformation.put(stateDescriptor.getName(), new Tuple2<>(createColumnFamily, stateDescriptor));
            return createColumnFamily;
        } catch (RocksDBException e) {
            throw new RuntimeException("Error creating ColumnFamilyHandle.", e);
        }
    }

    public Object currentKey() {
        return this.currentKey;
    }

    public TypeSerializer keySerializer() {
        return this.keySerializer;
    }

    protected <N, T> ValueState<T> createValueState(TypeSerializer<N> typeSerializer, ValueStateDescriptor<T> valueStateDescriptor) throws Exception {
        return new RocksDBValueState(getColumnFamily(valueStateDescriptor), typeSerializer, valueStateDescriptor, this);
    }

    protected <N, T> ListState<T> createListState(TypeSerializer<N> typeSerializer, ListStateDescriptor<T> listStateDescriptor) throws Exception {
        return new RocksDBListState(getColumnFamily(listStateDescriptor), typeSerializer, listStateDescriptor, this);
    }

    protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> typeSerializer, ReducingStateDescriptor<T> reducingStateDescriptor) throws Exception {
        return new RocksDBReducingState(getColumnFamily(reducingStateDescriptor), typeSerializer, reducingStateDescriptor, this);
    }

    protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> typeSerializer, FoldingStateDescriptor<T, ACC> foldingStateDescriptor) throws Exception {
        return new RocksDBFoldingState(getColumnFamily(foldingStateDescriptor), typeSerializer, foldingStateDescriptor, this);
    }

    public AbstractStateBackend.CheckpointStateOutputStream createCheckpointStateOutputStream(long j, long j2) throws Exception {
        return this.nonPartitionedStateBackend.createCheckpointStateOutputStream(j, j2);
    }

    public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S s, long j, long j2) throws Exception {
        return this.nonPartitionedStateBackend.checkpointStateSerializable(s, j, j2);
    }

    public void enableFullyAsyncSnapshots() {
        this.fullyAsyncBackup = true;
    }

    public void disableFullyAsyncSnapshots() {
        this.fullyAsyncBackup = false;
    }

    public void setDbStoragePath(String str) {
        setDbStoragePaths(str == null ? null : new String[]{str});
    }

    public void setDbStoragePaths(String... strArr) {
        if (strArr == null) {
            this.configuredDbBasePaths = null;
            return;
        }
        if (strArr.length == 0) {
            throw new IllegalArgumentException("empty paths");
        }
        Path[] pathArr = new Path[strArr.length];
        for (int i = 0; i < strArr.length; i++) {
            if (strArr[i] == null) {
                throw new IllegalArgumentException("null path");
            }
            pathArr[i] = new Path(strArr[i]);
            String scheme = pathArr[i].toUri().getScheme();
            if (scheme != null && !scheme.equalsIgnoreCase("file")) {
                throw new IllegalArgumentException("Path " + strArr[i] + " has a non local scheme");
            }
        }
        this.configuredDbBasePaths = pathArr;
    }

    public String[] getDbStoragePaths() {
        if (this.configuredDbBasePaths == null) {
            return null;
        }
        String[] strArr = new String[this.configuredDbBasePaths.length];
        for (int i = 0; i < strArr.length; i++) {
            strArr[i] = this.configuredDbBasePaths[i].toString();
        }
        return strArr;
    }

    public void setPredefinedOptions(PredefinedOptions predefinedOptions) {
        this.predefinedOptions = (PredefinedOptions) Objects.requireNonNull(predefinedOptions);
    }

    public PredefinedOptions getPredefinedOptions() {
        return this.predefinedOptions;
    }

    public void setOptions(OptionsFactory optionsFactory) {
        this.optionsFactory = optionsFactory;
    }

    public OptionsFactory getOptions() {
        return this.optionsFactory;
    }

    public DBOptions getDbOptions() {
        if (this.dbOptions == null) {
            DBOptions createDBOptions = this.predefinedOptions.createDBOptions();
            if (this.optionsFactory != null) {
                createDBOptions = this.optionsFactory.createDBOptions(createDBOptions);
            }
            this.dbOptions = createDBOptions.setCreateIfMissing(true);
        }
        return this.dbOptions;
    }

    public ColumnFamilyOptions getColumnOptions() {
        if (this.columnOptions == null) {
            ColumnFamilyOptions createColumnOptions = this.predefinedOptions.createColumnOptions();
            if (this.optionsFactory != null) {
                createColumnOptions = this.optionsFactory.createColumnOptions(createColumnOptions);
            }
            this.columnOptions = createColumnOptions;
        }
        return this.columnOptions;
    }
}
