package org.apache.flink.contrib.streaming.state.restore;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Function;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.contrib.streaming.state.RocksDBIncrementalCheckpointUtils;
import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBStateDownloader;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil;
import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.class */
public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestoreOperation<K> {
    private final String operatorIdentifier;
    private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles;
    private final PriorityQueueFlag queueRestoreEnabled;
    private long lastCompletedCheckpointId;
    private UUID backendUID;
    private final long writeBatchSize;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation$RestoredDBInstance.class */
    public static class RestoredDBInstance implements AutoCloseable {

        @Nonnull
        private final RocksDB db;

        @Nonnull
        private final ColumnFamilyHandle defaultColumnFamilyHandle;

        @Nonnull
        private final List<ColumnFamilyHandle> columnFamilyHandles;

        @Nonnull
        private final List<ColumnFamilyDescriptor> columnFamilyDescriptors;

        @Nonnull
        private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
        private final ReadOptions readOptions;

        private RestoredDBInstance(@Nonnull RocksDB rocksDB, @Nonnull List<ColumnFamilyHandle> list, @Nonnull List<ColumnFamilyDescriptor> list2, @Nonnull List<StateMetaInfoSnapshot> list3) {
            this.db = rocksDB;
            this.defaultColumnFamilyHandle = list.remove(0);
            this.columnFamilyHandles = list;
            this.columnFamilyDescriptors = list2;
            this.stateMetaInfoSnapshots = list3;
            this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            ArrayList arrayList = new ArrayList(this.columnFamilyDescriptors.size() + 1);
            this.columnFamilyDescriptors.forEach(columnFamilyDescriptor -> {
                arrayList.add(columnFamilyDescriptor.getOptions());
            });
            RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(arrayList, this.defaultColumnFamilyHandle);
            IOUtils.closeQuietly(this.defaultColumnFamilyHandle);
            IOUtils.closeAllQuietly(this.columnFamilyHandles);
            IOUtils.closeQuietly(this.db);
            IOUtils.closeAllQuietly(arrayList);
            IOUtils.closeQuietly(this.readOptions);
        }
    }

    public RocksDBIncrementalRestoreOperation(String str, KeyGroupRange keyGroupRange, int i, int i2, CloseableRegistry closeableRegistry, ClassLoader classLoader, Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> map, StateSerializerProvider<K> stateSerializerProvider, File file, File file2, DBOptions dBOptions, Function<String, ColumnFamilyOptions> function, RocksDBNativeMetricOptions rocksDBNativeMetricOptions, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> collection, @Nonnull RocksDbTtlCompactFiltersManager rocksDbTtlCompactFiltersManager, @Nonnegative long j, Long l, PriorityQueueFlag priorityQueueFlag) {
        super(keyGroupRange, i, i2, closeableRegistry, classLoader, map, stateSerializerProvider, file, file2, dBOptions, function, rocksDBNativeMetricOptions, metricGroup, collection, rocksDbTtlCompactFiltersManager, l);
        this.operatorIdentifier = str;
        this.restoredSstFiles = new TreeMap();
        this.lastCompletedCheckpointId = -1L;
        this.backendUID = UUID.randomUUID();
        this.queueRestoreEnabled = priorityQueueFlag;
        Preconditions.checkArgument(j >= 0, "Write batch size have to be no negative.");
        this.writeBatchSize = j;
    }

    @Override // org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation, org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreOperation
    /* renamed from: restore */
    public RocksDBRestoreResult mo32restore() throws Exception {
        if (this.restoreStateHandles == null || this.restoreStateHandles.isEmpty()) {
            return null;
        }
        KeyedStateHandle next = this.restoreStateHandles.iterator().next();
        if (this.restoreStateHandles.size() > 1 || !Objects.equals(next.getKeyGroupRange(), this.keyGroupRange)) {
            restoreWithRescaling(this.restoreStateHandles);
        } else {
            restoreWithoutRescaling(next);
        }
        return new RocksDBRestoreResult(this.db, this.defaultColumnFamilyHandle, this.nativeMetricMonitor, this.lastCompletedCheckpointId, this.backendUID, this.restoredSstFiles);
    }

    private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
        this.logger.info("Starting to restore from state handle: {} without rescaling.", keyedStateHandle);
        if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
            IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle = (IncrementalRemoteKeyedStateHandle) keyedStateHandle;
            restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);
            restoreFromRemoteState(incrementalRemoteKeyedStateHandle);
        } else {
            if (!(keyedStateHandle instanceof IncrementalLocalKeyedStateHandle)) {
                throw StateUtil.unexpectedStateHandleException(new Class[]{IncrementalRemoteKeyedStateHandle.class, IncrementalLocalKeyedStateHandle.class}, keyedStateHandle.getClass());
            }
            IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle = (IncrementalLocalKeyedStateHandle) keyedStateHandle;
            restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
            restoreFromLocalState(incrementalLocalKeyedStateHandle);
        }
        this.logger.info("Finished restoring from state handle: {} without rescaling.", keyedStateHandle);
    }

    private void restorePreviousIncrementalFilesStatus(IncrementalKeyedStateHandle incrementalKeyedStateHandle) {
        this.backendUID = incrementalKeyedStateHandle.getBackendIdentifier();
        this.restoredSstFiles.put(Long.valueOf(incrementalKeyedStateHandle.getCheckpointId()), incrementalKeyedStateHandle.getSharedStateHandleIDs());
        this.lastCompletedCheckpointId = incrementalKeyedStateHandle.getCheckpointId();
    }

    private void restoreFromRemoteState(IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle) throws Exception {
        Path resolve = this.instanceBasePath.getAbsoluteFile().toPath().resolve(UUID.randomUUID().toString());
        try {
            restoreFromLocalState(transferRemoteStateToLocalDirectory(resolve, incrementalRemoteKeyedStateHandle));
        } finally {
            cleanUpPathQuietly(resolve);
        }
    }

    private void restoreFromLocalState(IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle) throws Exception {
        List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = readMetaData(incrementalLocalKeyedStateHandle.getMetaDataState()).getStateMetaInfoSnapshots();
        this.columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, true, this.writeBufferManagerCapacity);
        this.columnFamilyHandles = new ArrayList(this.columnFamilyDescriptors.size() + 1);
        Path directory = incrementalLocalKeyedStateHandle.getDirectoryStateHandle().getDirectory();
        this.logger.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.", this.operatorIdentifier, this.backendUID);
        if (!this.instanceRocksDBPath.mkdirs()) {
            String str = "Could not create RocksDB data directory: " + this.instanceBasePath.getAbsolutePath();
            this.logger.error(str);
            throw new IOException(str);
        }
        restoreInstanceDirectoryFromPath(directory, this.dbPath);
        openDB();
        registerColumnFamilyHandles(stateMetaInfoSnapshots);
    }

    private IncrementalLocalKeyedStateHandle transferRemoteStateToLocalDirectory(Path path, IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle) throws Exception {
        RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(this.numberOfTransferringThreads);
        Throwable th = null;
        try {
            try {
                rocksDBStateDownloader.transferAllStateDataToDirectory(incrementalRemoteKeyedStateHandle, path, this.cancelStreamRegistry);
                if (rocksDBStateDownloader != null) {
                    if (0 != 0) {
                        try {
                            rocksDBStateDownloader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        rocksDBStateDownloader.close();
                    }
                }
                return new IncrementalLocalKeyedStateHandle(incrementalRemoteKeyedStateHandle.getBackendIdentifier(), incrementalRemoteKeyedStateHandle.getCheckpointId(), new DirectoryStateHandle(path), incrementalRemoteKeyedStateHandle.getKeyGroupRange(), incrementalRemoteKeyedStateHandle.getMetaStateHandle(), incrementalRemoteKeyedStateHandle.getSharedState().keySet());
            } finally {
            }
        } catch (Throwable th3) {
            if (rocksDBStateDownloader != null) {
                if (th != null) {
                    try {
                        rocksDBStateDownloader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    rocksDBStateDownloader.close();
                }
            }
            throw th3;
        }
    }

    private void cleanUpPathQuietly(@Nonnull Path path) {
        try {
            FileUtils.deleteDirectory(path.toFile());
        } catch (IOException e) {
            this.logger.warn("Failed to clean up path " + path, e);
        }
    }

    private void registerColumnFamilyHandles(List<StateMetaInfoSnapshot> list) {
        for (int i = 0; i < list.size(); i++) {
            getOrRegisterStateColumnFamilyHandle(this.columnFamilyHandles.get(i), list.get(i));
        }
    }

    /* JADX WARN: Finally extract failed */
    private void restoreWithRescaling(Collection<KeyedStateHandle> collection) throws Exception {
        KeyedStateHandle chooseTheBestStateHandleForInitial = RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(collection, this.keyGroupRange);
        if (chooseTheBestStateHandleForInitial != null) {
            collection.remove(chooseTheBestStateHandleForInitial);
            initDBWithRescaling(chooseTheBestStateHandleForInitial);
        } else {
            openDB();
        }
        byte[] bArr = new byte[this.keyGroupPrefixBytes];
        RocksDBKeySerializationUtils.serializeKeyGroup(this.keyGroupRange.getStartKeyGroup(), bArr);
        byte[] bArr2 = new byte[this.keyGroupPrefixBytes];
        RocksDBKeySerializationUtils.serializeKeyGroup(this.keyGroupRange.getEndKeyGroup() + 1, bArr2);
        Iterator<KeyedStateHandle> it = collection.iterator();
        while (it.hasNext()) {
            IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle = (KeyedStateHandle) it.next();
            if (!(incrementalRemoteKeyedStateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
                throw StateUtil.unexpectedStateHandleException(IncrementalRemoteKeyedStateHandle.class, incrementalRemoteKeyedStateHandle.getClass());
            }
            this.logger.info("Starting to restore from state handle: {} with rescaling.", incrementalRemoteKeyedStateHandle);
            Path resolve = this.instanceBasePath.getAbsoluteFile().toPath().resolve(UUID.randomUUID().toString());
            try {
                RestoredDBInstance restoreDBInstanceFromStateHandle = restoreDBInstanceFromStateHandle(incrementalRemoteKeyedStateHandle, resolve);
                Throwable th = null;
                try {
                    RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper = new RocksDBWriteBatchWrapper(this.db, this.writeBatchSize);
                    Throwable th2 = null;
                    try {
                        List list = restoreDBInstanceFromStateHandle.columnFamilyDescriptors;
                        List list2 = restoreDBInstanceFromStateHandle.columnFamilyHandles;
                        for (int i = 0; i < list.size(); i++) {
                            ColumnFamilyHandle columnFamilyHandle = (ColumnFamilyHandle) list2.get(i);
                            ColumnFamilyHandle columnFamilyHandle2 = getOrRegisterStateColumnFamilyHandle(null, (StateMetaInfoSnapshot) restoreDBInstanceFromStateHandle.stateMetaInfoSnapshots.get(i)).columnFamilyHandle;
                            RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(restoreDBInstanceFromStateHandle.db, columnFamilyHandle, restoreDBInstanceFromStateHandle.readOptions);
                            Throwable th3 = null;
                            try {
                                try {
                                    rocksIterator.seek(bArr);
                                    while (rocksIterator.isValid() && RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(rocksIterator.key(), bArr2)) {
                                        rocksDBWriteBatchWrapper.put(columnFamilyHandle2, rocksIterator.key(), rocksIterator.value());
                                        rocksIterator.next();
                                    }
                                    if (rocksIterator != null) {
                                        if (0 != 0) {
                                            try {
                                                rocksIterator.close();
                                            } catch (Throwable th4) {
                                                th3.addSuppressed(th4);
                                            }
                                        } else {
                                            rocksIterator.close();
                                        }
                                    }
                                } finally {
                                }
                            } catch (Throwable th5) {
                                if (rocksIterator != null) {
                                    if (th3 != null) {
                                        try {
                                            rocksIterator.close();
                                        } catch (Throwable th6) {
                                            th3.addSuppressed(th6);
                                        }
                                    } else {
                                        rocksIterator.close();
                                    }
                                }
                                throw th5;
                            }
                        }
                        this.logger.info("Finished restoring from state handle: {} with rescaling.", incrementalRemoteKeyedStateHandle);
                        if (rocksDBWriteBatchWrapper != null) {
                            if (0 != 0) {
                                try {
                                    rocksDBWriteBatchWrapper.close();
                                } catch (Throwable th7) {
                                    th2.addSuppressed(th7);
                                }
                            } else {
                                rocksDBWriteBatchWrapper.close();
                            }
                        }
                        if (restoreDBInstanceFromStateHandle != null) {
                            if (0 != 0) {
                                try {
                                    restoreDBInstanceFromStateHandle.close();
                                } catch (Throwable th8) {
                                    th.addSuppressed(th8);
                                }
                            } else {
                                restoreDBInstanceFromStateHandle.close();
                            }
                        }
                    } catch (Throwable th9) {
                        if (rocksDBWriteBatchWrapper != null) {
                            if (0 != 0) {
                                try {
                                    rocksDBWriteBatchWrapper.close();
                                } catch (Throwable th10) {
                                    th2.addSuppressed(th10);
                                }
                            } else {
                                rocksDBWriteBatchWrapper.close();
                            }
                        }
                        throw th9;
                    }
                } catch (Throwable th11) {
                    if (restoreDBInstanceFromStateHandle != null) {
                        if (0 != 0) {
                            try {
                                restoreDBInstanceFromStateHandle.close();
                            } catch (Throwable th12) {
                                th.addSuppressed(th12);
                            }
                        } else {
                            restoreDBInstanceFromStateHandle.close();
                        }
                    }
                    throw th11;
                }
            } finally {
                cleanUpPathQuietly(resolve);
            }
        }
    }

    private void initDBWithRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
        if (!$assertionsDisabled && !(keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
            throw new AssertionError();
        }
        restoreFromRemoteState((IncrementalRemoteKeyedStateHandle) keyedStateHandle);
        try {
            RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(this.db, this.columnFamilyHandles, this.keyGroupRange, keyedStateHandle.getKeyGroupRange(), this.keyGroupPrefixBytes, this.writeBatchSize);
        } catch (RocksDBException e) {
            this.logger.error("Failed to clip DB after initialization.", e);
            throw new BackendBuildingException("Failed to clip DB after initialization.", e);
        }
    }

    private RestoredDBInstance restoreDBInstanceFromStateHandle(IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle, Path path) throws Exception {
        RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(this.numberOfTransferringThreads);
        Throwable th = null;
        try {
            try {
                rocksDBStateDownloader.transferAllStateDataToDirectory(incrementalRemoteKeyedStateHandle, path, this.cancelStreamRegistry);
                if (rocksDBStateDownloader != null) {
                    if (0 != 0) {
                        try {
                            rocksDBStateDownloader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        rocksDBStateDownloader.close();
                    }
                }
                List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = readMetaData(incrementalRemoteKeyedStateHandle.getMetaStateHandle()).getStateMetaInfoSnapshots();
                List<ColumnFamilyDescriptor> createAndRegisterColumnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, false, this.writeBufferManagerCapacity);
                ArrayList arrayList = new ArrayList(stateMetaInfoSnapshots.size() + 1);
                return new RestoredDBInstance(RocksDBOperationUtils.openDB(path.toString(), createAndRegisterColumnFamilyDescriptors, arrayList, RocksDBOperationUtils.createColumnFamilyOptions(this.columnFamilyOptionsFactory, "default"), this.dbOptions), arrayList, createAndRegisterColumnFamilyDescriptors, stateMetaInfoSnapshots);
            } finally {
            }
        } catch (Throwable th3) {
            if (rocksDBStateDownloader != null) {
                if (th != null) {
                    try {
                        rocksDBStateDownloader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    rocksDBStateDownloader.close();
                }
            }
            throw th3;
        }
    }

    private List<ColumnFamilyDescriptor> createAndRegisterColumnFamilyDescriptors(List<StateMetaInfoSnapshot> list, boolean z, Long l) throws StateMigrationException {
        ArrayList arrayList = new ArrayList(list.size());
        for (StateMetaInfoSnapshot stateMetaInfoSnapshot : list) {
            if (stateMetaInfoSnapshot.getBackendStateType() == StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE && this.queueRestoreEnabled == PriorityQueueFlag.THROW_ON_PRIORITY_QUEUE) {
                throw new StateMigrationException("Can not restore savepoint taken with RocksDB timers enabled with Heap timers!");
            }
            arrayList.add(RocksDBOperationUtils.createColumnFamilyDescriptor(RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot), this.columnFamilyOptionsFactory, z ? this.ttlCompactFiltersManager : null, l));
        }
        return arrayList;
    }

    private void restoreInstanceDirectoryFromPath(Path path, String str) throws IOException {
        Path path2 = Paths.get(str, new String[0]);
        for (Path path3 : FileUtils.listDirectory(path)) {
            String path4 = path3.getFileName().toString();
            Path resolve = path2.resolve(path4);
            if (path4.endsWith(RocksSnapshotUtil.SST_FILE_SUFFIX)) {
                Files.createLink(resolve, path3);
            } else {
                Files.copy(path3, resolve, StandardCopyOption.REPLACE_EXISTING);
            }
        }
    }

    private KeyedBackendSerializationProxy<K> readMetaData(StreamStateHandle streamStateHandle) throws Exception {
        InputStream inputStream = null;
        try {
            inputStream = streamStateHandle.openInputStream();
            this.cancelStreamRegistry.registerCloseable(inputStream);
            KeyedBackendSerializationProxy<K> readMetaData = readMetaData((DataInputView) new DataInputViewStreamWrapper(inputStream));
            if (this.cancelStreamRegistry.unregisterCloseable(inputStream)) {
                inputStream.close();
            }
            return readMetaData;
        } catch (Throwable th) {
            if (this.cancelStreamRegistry.unregisterCloseable(inputStream)) {
                inputStream.close();
            }
            throw th;
        }
    }

    static {
        $assertionsDisabled = !RocksDBIncrementalRestoreOperation.class.desiredAssertionStatus();
    }
}
