package org.apache.flink.contrib.streaming.state;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Objects;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.streaming.util.HDFSCopyFromLocal;
import org.apache.flink.streaming.util.HDFSCopyToLocal;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.rocksdb.BackupEngine;
import org.rocksdb.BackupableDBOptions;
import org.rocksdb.Env;
import org.rocksdb.FlushOptions;
import org.rocksdb.Options;
import org.rocksdb.RestoreOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/AbstractRocksDBState.class */
public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>> implements KvState<K, N, S, SD, RocksDBStateBackend>, State {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class);
    protected final TypeSerializer<K> keySerializer;
    protected final TypeSerializer<N> namespaceSerializer;
    protected K currentKey;
    protected N currentNamespace;
    protected final File basePath;
    protected final String checkpointPath;
    protected final File rocksDbPath;
    protected final RocksDB db;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/AbstractRocksDBState$AbstractRocksDBSnapshot.class */
    public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>> implements KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> {
        private static final long serialVersionUID = 1;
        private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSnapshot.class);
        protected final File basePath;
        protected final String checkpointPath;
        protected final URI backupUri;
        protected long checkpointId;
        protected final TypeSerializer<K> keySerializer;
        protected final TypeSerializer<N> namespaceSerializer;
        protected final SD stateDesc;

        public AbstractRocksDBSnapshot(File file, String str, URI uri, long j, TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, SD sd) {
            this.basePath = file;
            this.checkpointPath = str;
            this.backupUri = uri;
            this.checkpointId = j;
            this.stateDesc = sd;
            this.keySerializer = typeSerializer;
            this.namespaceSerializer = typeSerializer2;
        }

        protected abstract KvState<K, N, S, SD, RocksDBStateBackend> createRocksDBState(TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, SD sd, File file, String str, String str2, Options options) throws Exception;

        public final KvState<K, N, S, SD, RocksDBStateBackend> restoreState(RocksDBStateBackend rocksDBStateBackend, TypeSerializer<K> typeSerializer, ClassLoader classLoader, long j) throws Exception {
            if (!this.keySerializer.equals(typeSerializer)) {
                throw new IllegalArgumentException("Cannot restore the state from the snapshot with the given serializers. State (K/V) was serialized with (" + typeSerializer + ") now is (" + typeSerializer + ")");
            }
            if (!this.basePath.exists() && !this.basePath.mkdirs()) {
                throw new RuntimeException("Could not create RocksDB base path " + this.basePath);
            }
            File file = new File(this.basePath, "chk-" + this.checkpointId);
            if (file.exists()) {
                try {
                    LOG.warn("Deleting already existing local backup directory {}.", file);
                    FileUtils.deleteDirectory(file);
                } catch (IOException e) {
                    throw new RuntimeException("Error cleaning RocksDB local backup directory.", e);
                }
            }
            HDFSCopyToLocal.copyToLocal(this.backupUri, this.basePath);
            return createRocksDBState(typeSerializer, this.namespaceSerializer, this.stateDesc, this.basePath, this.checkpointPath, file.getAbsolutePath(), rocksDBStateBackend.getRocksDBOptions());
        }

        public final void discardState() throws Exception {
            FileSystem.get(this.backupUri, HadoopFileSystem.getHadoopConfiguration()).delete(new Path(this.backupUri), true);
        }

        public final long getStateSize() throws Exception {
            return FileSystem.get(this.backupUri, HadoopFileSystem.getHadoopConfiguration()).getContentSummary(new Path(this.backupUri)).getLength();
        }
    }

    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/AbstractRocksDBState$AsyncRocksDBSnapshot.class */
    private static class AsyncRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>> extends AsynchronousKvStateSnapshot<K, N, S, SD, RocksDBStateBackend> {
        private static final long serialVersionUID = 1;
        private final File localBackupPath;
        private final URI backupUri;
        private final long checkpointId;
        private transient AbstractRocksDBState<K, N, S, SD> state;

        public AsyncRocksDBSnapshot(File file, URI uri, long j, AbstractRocksDBState<K, N, S, SD> abstractRocksDBState) {
            this.localBackupPath = file;
            this.backupUri = uri;
            this.checkpointId = j;
            this.state = abstractRocksDBState;
        }

        public KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> materialize() throws Exception {
            try {
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    HDFSCopyFromLocal.copyFromLocal(this.localBackupPath, this.backupUri);
                    AbstractRocksDBState.LOG.info("RocksDB materialization from " + this.localBackupPath + " to " + this.backupUri + " (asynchronous part) took " + (System.currentTimeMillis() - currentTimeMillis) + " ms.");
                    AbstractRocksDBSnapshot<K, N, S, SD> createRocksDBSnapshot = this.state.createRocksDBSnapshot(this.backupUri, this.checkpointId);
                    FileUtils.deleteQuietly(this.localBackupPath);
                    return createRocksDBSnapshot;
                } catch (Exception e) {
                    FileSystem.get(this.backupUri, HadoopFileSystem.getHadoopConfiguration()).delete(new Path(this.backupUri), true);
                    throw e;
                }
            } catch (Throwable th) {
                FileUtils.deleteQuietly(this.localBackupPath);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRocksDBState(TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, File file, String str, Options options) {
        this.rocksDbPath = new File(file, "db" + UUID.randomUUID().toString());
        this.keySerializer = (TypeSerializer) Objects.requireNonNull(typeSerializer);
        this.namespaceSerializer = typeSerializer2;
        this.basePath = file;
        this.checkpointPath = str;
        RocksDB.loadLibrary();
        if (!file.exists() && !file.mkdirs()) {
            throw new RuntimeException("Could not create RocksDB data directory.");
        }
        try {
            if (this.rocksDbPath.exists()) {
                LOG.warn("Deleting already existing db directory {}.", this.rocksDbPath);
                FileUtils.deleteDirectory(this.rocksDbPath);
            }
            try {
                this.db = RocksDB.open(options, this.rocksDbPath.getAbsolutePath());
            } catch (RocksDBException e) {
                throw new RuntimeException("Error while opening RocksDB instance.", e);
            }
        } catch (IOException e2) {
            throw new RuntimeException("Error cleaning RocksDB data directory.", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRocksDBState(TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, File file, String str, String str2, Options options) {
        this.rocksDbPath = new File(file, "db" + UUID.randomUUID().toString());
        RocksDB.loadLibrary();
        try {
            if (this.rocksDbPath.exists()) {
                LOG.warn("Deleting already existing db directory {}.", this.rocksDbPath);
                FileUtils.deleteDirectory(this.rocksDbPath);
            }
            try {
                try {
                    BackupEngine open = BackupEngine.open(Env.getDefault(), new BackupableDBOptions(str2 + "/"));
                    Throwable th = null;
                    try {
                        try {
                            open.restoreDbFromLatestBackup(this.rocksDbPath.getAbsolutePath(), this.rocksDbPath.getAbsolutePath(), new RestoreOptions(true));
                            if (open != null) {
                                if (0 != 0) {
                                    try {
                                        open.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    open.close();
                                }
                            }
                            this.keySerializer = (TypeSerializer) Objects.requireNonNull(typeSerializer);
                            this.namespaceSerializer = typeSerializer2;
                            this.basePath = file;
                            this.checkpointPath = str;
                            if (!file.exists() && !file.mkdirs()) {
                                throw new RuntimeException("Could not create RocksDB data directory.");
                            }
                            try {
                                this.db = RocksDB.open(options, this.rocksDbPath.getAbsolutePath());
                            } catch (RocksDBException e) {
                                throw new RuntimeException("Error while opening RocksDB instance.", e);
                            }
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (open != null) {
                            if (th != null) {
                                try {
                                    open.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                open.close();
                            }
                        }
                        throw th3;
                    }
                } catch (RocksDBException | IllegalArgumentException e2) {
                    throw new RuntimeException("Error while restoring RocksDB state from " + str2, e2);
                }
            } finally {
                try {
                    FileUtils.deleteDirectory(new File(str2));
                } catch (IOException e3) {
                    LOG.error("Error cleaning up local restore directory " + str2, e3);
                }
            }
        } catch (IOException e4) {
            throw new RuntimeException("Error cleaning RocksDB data directory.", e4);
        }
    }

    public final void clear() {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            writeKeyAndNamespace(new DataOutputViewStreamWrapper(byteArrayOutputStream));
            this.db.remove(byteArrayOutputStream.toByteArray());
        } catch (IOException | RocksDBException e) {
            throw new RuntimeException("Error while removing entry from RocksDB", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeKeyAndNamespace(DataOutputView dataOutputView) throws IOException {
        this.keySerializer.serialize(this.currentKey, dataOutputView);
        dataOutputView.writeByte(42);
        this.namespaceSerializer.serialize(this.currentNamespace, dataOutputView);
    }

    public final void setCurrentKey(K k) {
        this.currentKey = k;
    }

    public final void setCurrentNamespace(N n) {
        this.currentNamespace = n;
    }

    protected abstract AbstractRocksDBSnapshot<K, N, S, SD> createRocksDBSnapshot(URI uri, long j);

    public final KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> snapshot(long j, long j2) throws Exception {
        File file = new File(this.basePath, "local-chk-" + j);
        URI uri = new URI(this.checkpointPath + "/chk-" + j);
        if (!file.exists() && !file.mkdirs()) {
            throw new RuntimeException("Could not create local backup path " + file);
        }
        long currentTimeMillis = System.currentTimeMillis();
        BackupableDBOptions backupableDBOptions = new BackupableDBOptions(file.getAbsolutePath());
        backupableDBOptions.setBackupLogFiles(false);
        backupableDBOptions.setSync(false);
        BackupEngine open = BackupEngine.open(Env.getDefault(), backupableDBOptions);
        Throwable th = null;
        try {
            try {
                this.db.flush(new FlushOptions().setWaitForFlush(true));
                open.createNewBackup(this.db);
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        open.close();
                    }
                }
                LOG.info("RocksDB (" + this.rocksDbPath + ") backup (synchronous part) took " + (System.currentTimeMillis() - currentTimeMillis) + " ms.");
                return new AsyncRocksDBSnapshot(file, uri, j, this);
            } finally {
            }
        } catch (Throwable th3) {
            if (open != null) {
                if (th != null) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }

    public final void dispose() {
        this.db.dispose();
        try {
            FileUtils.deleteDirectory(this.basePath);
        } catch (IOException e) {
            throw new RuntimeException("Error disposing RocksDB data directory.", e);
        }
    }
}
