package org.apache.flink.contrib.streaming.state;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackendTestBase;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
import org.rocksdb.RocksObject;
import org.rocksdb.Snapshot;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.class */
public class EmbeddedRocksDBStateBackendTest extends StateBackendTestBase<EmbeddedRocksDBStateBackend> {
    private OneShotLatch blocker;
    private OneShotLatch waiter;
    private BlockerCheckpointStreamFactory testStreamFactory;
    private RocksDBKeyedStateBackend<Integer> keyedStateBackend;
    private List<RocksObject> allCreatedCloseables;
    private ValueState<Integer> testState1;
    private ValueState<String> testState2;

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

    @Parameterized.Parameter(0)
    public boolean enableIncrementalCheckpointing;

    @Parameterized.Parameter(1)
    public SupplierWithException<CheckpointStorage, IOException> storageSupplier;
    private String dbPath;
    private RocksDB db = null;
    private ColumnFamilyHandle defaultCFHandle = null;
    private RocksDBStateUploader rocksDBStateUploader = null;
    private final RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer();

    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest$AcceptAllFilter.class */
    private static class AcceptAllFilter implements IOFileFilter {
        private AcceptAllFilter() {
        }

        public boolean accept(File file) {
            return true;
        }

        public boolean accept(File file, String str) {
            return true;
        }
    }

    @Parameterized.Parameters
    public static List<Object[]> modes() {
        return Arrays.asList(new Object[]{true, JobManagerCheckpointStorage::new}, new Object[]{false, () -> {
            return new FileSystemCheckpointStorage(TEMP_FOLDER.newFolder().toURI().toString());
        }});
    }

    public void prepareRocksDB() throws Exception {
        String absolutePath = new File(TEMP_FOLDER.newFolder(), "db").getAbsolutePath();
        ColumnFamilyOptions columnOptions = this.optionsContainer.getColumnOptions();
        ArrayList arrayList = new ArrayList(1);
        this.db = RocksDBOperationUtils.openDB(absolutePath, Collections.emptyList(), arrayList, columnOptions, this.optionsContainer.getDbOptions());
        this.defaultCFHandle = (ColumnFamilyHandle) arrayList.remove(0);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: getStateBackend, reason: merged with bridge method [inline-methods] */
    public EmbeddedRocksDBStateBackend m1getStateBackend() throws IOException {
        this.dbPath = TEMP_FOLDER.newFolder().getAbsolutePath();
        EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(this.enableIncrementalCheckpointing);
        Configuration configuration = new Configuration();
        configuration.set(RocksDBOptions.TIMER_SERVICE_FACTORY, EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
        EmbeddedRocksDBStateBackend configure = embeddedRocksDBStateBackend.configure(configuration, Thread.currentThread().getContextClassLoader());
        configure.setDbStoragePath(this.dbPath);
        return configure;
    }

    protected CheckpointStorage getCheckpointStorage() throws Exception {
        return (CheckpointStorage) this.storageSupplier.get();
    }

    protected boolean isSerializerPresenceRequiredOnRestore() {
        return false;
    }

    protected boolean supportsAsynchronousSnapshots() {
        return true;
    }

    @After
    public void cleanupRocksDB() {
        if (this.keyedStateBackend != null) {
            IOUtils.closeQuietly(this.keyedStateBackend);
            this.keyedStateBackend.dispose();
        }
        IOUtils.closeQuietly(this.defaultCFHandle);
        IOUtils.closeQuietly(this.db);
        IOUtils.closeQuietly(this.optionsContainer);
        if (this.allCreatedCloseables != null) {
            Iterator<RocksObject> it = this.allCreatedCloseables.iterator();
            while (it.hasNext()) {
                ((RocksObject) Mockito.verify(it.next(), VerificationModeFactory.times(1))).close();
            }
            this.allCreatedCloseables = null;
        }
    }

    public void setupRocksKeyedStateBackend() throws Exception {
        this.blocker = new OneShotLatch();
        this.waiter = new OneShotLatch();
        this.testStreamFactory = new BlockerCheckpointStreamFactory(1048576);
        this.testStreamFactory.setBlockerLatch(this.blocker);
        this.testStreamFactory.setWaiterLatch(this.waiter);
        this.testStreamFactory.setAfterNumberInvocations(10);
        prepareRocksDB();
        RocksDBKeyedStateBackendBuilder enableIncrementalCheckpointing = RocksDBTestUtils.builderForTestDB(TEMP_FOLDER.newFolder(), IntSerializer.INSTANCE, (RocksDB) PowerMockito.spy(this.db), this.defaultCFHandle, this.optionsContainer.getColumnOptions()).setEnableIncrementalCheckpointing(this.enableIncrementalCheckpointing);
        if (this.enableIncrementalCheckpointing) {
            this.rocksDBStateUploader = (RocksDBStateUploader) PowerMockito.spy(new RocksDBStateUploader(((Integer) RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue()).intValue()));
            enableIncrementalCheckpointing.setRocksDBStateUploader(this.rocksDBStateUploader);
        }
        this.keyedStateBackend = enableIncrementalCheckpointing.build();
        this.testState1 = this.keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("TestState-1", Integer.class, 0));
        this.testState2 = this.keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("TestState-2", String.class, ""));
        this.allCreatedCloseables = new ArrayList();
        ((RocksDB) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendTest.1
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                RocksIterator rocksIterator = (RocksIterator) PowerMockito.spy((RocksIterator) invocationOnMock.callRealMethod());
                EmbeddedRocksDBStateBackendTest.this.allCreatedCloseables.add(rocksIterator);
                return rocksIterator;
            }
        }).when(this.keyedStateBackend.db)).newIterator((ColumnFamilyHandle) Matchers.any(ColumnFamilyHandle.class), (ReadOptions) Matchers.any(ReadOptions.class));
        ((RocksDB) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendTest.2
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                Snapshot snapshot = (Snapshot) PowerMockito.spy((Snapshot) invocationOnMock.callRealMethod());
                EmbeddedRocksDBStateBackendTest.this.allCreatedCloseables.add(snapshot);
                return snapshot;
            }
        }).when(this.keyedStateBackend.db)).getSnapshot();
        ((RocksDB) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendTest.3
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ColumnFamilyHandle columnFamilyHandle = (ColumnFamilyHandle) PowerMockito.spy((ColumnFamilyHandle) invocationOnMock.callRealMethod());
                EmbeddedRocksDBStateBackendTest.this.allCreatedCloseables.add(columnFamilyHandle);
                return columnFamilyHandle;
            }
        }).when(this.keyedStateBackend.db)).createColumnFamily((ColumnFamilyDescriptor) Matchers.any(ColumnFamilyDescriptor.class));
        for (int i = 0; i < 100; i++) {
            this.keyedStateBackend.setCurrentKey(Integer.valueOf(i));
            this.testState1.update(Integer.valueOf(4200 + i));
            this.testState2.update("S-" + (4200 + i));
        }
    }

    @Test
    public void testCorrectMergeOperatorSet() throws Exception {
        prepareRocksDB();
        ColumnFamilyOptions columnFamilyOptions = (ColumnFamilyOptions) PowerMockito.spy(new ColumnFamilyOptions());
        RocksDBKeyedStateBackend rocksDBKeyedStateBackend = null;
        try {
            rocksDBKeyedStateBackend = RocksDBTestUtils.builderForTestDB(TEMP_FOLDER.newFolder(), IntSerializer.INSTANCE, this.db, this.defaultCFHandle, columnFamilyOptions).setEnableIncrementalCheckpointing(this.enableIncrementalCheckpointing).build();
            rocksDBKeyedStateBackend.createInternalState(StringSerializer.INSTANCE, new ValueStateDescriptor("StubState-1", StringSerializer.INSTANCE));
            rocksDBKeyedStateBackend.createInternalState(StringSerializer.INSTANCE, new ValueStateDescriptor("StubState-2", StringSerializer.INSTANCE));
            ((ColumnFamilyOptions) Mockito.verify(columnFamilyOptions, Mockito.times(2))).setMergeOperatorName("stringappendtest");
            if (rocksDBKeyedStateBackend != null) {
                IOUtils.closeQuietly(rocksDBKeyedStateBackend);
                rocksDBKeyedStateBackend.dispose();
            }
            columnFamilyOptions.close();
        } catch (Throwable th) {
            if (rocksDBKeyedStateBackend != null) {
                IOUtils.closeQuietly(rocksDBKeyedStateBackend);
                rocksDBKeyedStateBackend.dispose();
            }
            columnFamilyOptions.close();
            throw th;
        }
    }

    @Test
    public void testReleasingSnapshotAfterBackendClosed() throws Exception {
        setupRocksKeyedStateBackend();
        try {
            RunnableFuture snapshot = this.keyedStateBackend.snapshot(0L, 0L, this.testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            RocksDB rocksDB = this.keyedStateBackend.db;
            if (!this.enableIncrementalCheckpointing) {
                ((RocksDB) Mockito.verify(rocksDB, VerificationModeFactory.times(1))).getSnapshot();
                ((RocksDB) Mockito.verify(rocksDB, VerificationModeFactory.times(0))).releaseSnapshot((Snapshot) Matchers.any(Snapshot.class));
            }
            Iterator<RocksObject> it = this.allCreatedCloseables.iterator();
            while (it.hasNext()) {
                ((RocksObject) Mockito.verify(it.next(), VerificationModeFactory.times(0))).close();
            }
            snapshot.cancel(true);
            this.keyedStateBackend.dispose();
            ((RocksDB) Mockito.verify(rocksDB, VerificationModeFactory.times(1))).close();
            Assert.assertEquals(true, Boolean.valueOf(this.keyedStateBackend.isDisposed()));
            Iterator<RocksObject> it2 = this.allCreatedCloseables.iterator();
            while (it2.hasNext()) {
                ((RocksObject) Mockito.verify(it2.next(), VerificationModeFactory.times(1))).close();
            }
            verifyRocksDBStateUploaderClosed();
        } finally {
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
        }
    }

    @Test
    public void testDismissingSnapshot() throws Exception {
        setupRocksKeyedStateBackend();
        try {
            this.keyedStateBackend.snapshot(0L, 0L, this.testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()).cancel(true);
            verifyRocksObjectsReleased();
            verifyRocksDBStateUploaderClosed();
        } finally {
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
        }
    }

    @Test
    public void testDismissingSnapshotNotRunnable() throws Exception {
        setupRocksKeyedStateBackend();
        try {
            RunnableFuture snapshot = this.keyedStateBackend.snapshot(0L, 0L, this.testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            snapshot.cancel(true);
            Thread thread = new Thread(snapshot);
            thread.start();
            try {
                snapshot.get();
                Assert.fail();
            } catch (Exception e) {
            }
            thread.join();
            verifyRocksObjectsReleased();
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
            verifyRocksDBStateUploaderClosed();
        } catch (Throwable th) {
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
            throw th;
        }
    }

    @Test
    public void testCompletingSnapshot() throws Exception {
        setupRocksKeyedStateBackend();
        try {
            RunnableFuture snapshot = this.keyedStateBackend.snapshot(0L, 0L, this.testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread thread = new Thread(snapshot);
            thread.start();
            this.waiter.await();
            this.waiter.reset();
            runStateUpdates();
            this.blocker.trigger();
            this.waiter.await();
            KeyedStateHandle jobManagerOwnedSnapshot = ((SnapshotResult) snapshot.get()).getJobManagerOwnedSnapshot();
            TestCase.assertNotNull(jobManagerOwnedSnapshot);
            Assert.assertTrue(jobManagerOwnedSnapshot.getStateSize() > 0);
            Assert.assertEquals(2L, jobManagerOwnedSnapshot.getKeyGroupRange().getNumberOfKeyGroups());
            Iterator it = this.testStreamFactory.getAllCreatedStreams().iterator();
            while (it.hasNext()) {
                Assert.assertTrue(((BlockingCheckpointOutputStream) it.next()).isClosed());
            }
            thread.join();
            verifyRocksObjectsReleased();
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
            verifyRocksDBStateUploaderClosed();
        } catch (Throwable th) {
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
            throw th;
        }
    }

    @Test
    public void testCancelRunningSnapshot() throws Exception {
        setupRocksKeyedStateBackend();
        try {
            RunnableFuture snapshot = this.keyedStateBackend.snapshot(0L, 0L, this.testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread thread = new Thread(snapshot);
            thread.start();
            this.waiter.await();
            this.waiter.reset();
            runStateUpdates();
            snapshot.cancel(true);
            this.blocker.trigger();
            Iterator it = this.testStreamFactory.getAllCreatedStreams().iterator();
            while (it.hasNext()) {
                Assert.assertTrue(((BlockingCheckpointOutputStream) it.next()).isClosed());
            }
            this.waiter.await();
            try {
                snapshot.get();
                Assert.fail();
            } catch (Exception e) {
            }
            thread.join();
            verifyRocksObjectsReleased();
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
            verifyRocksDBStateUploaderClosed();
        } catch (Throwable th) {
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
            throw th;
        }
    }

    @Test
    public void testDisposeDeletesAllDirectories() throws Exception {
        CheckpointableKeyedStateBackend createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        Collection listFilesAndDirs = FileUtils.listFilesAndDirs(new File(this.dbPath), new AcceptAllFilter(), new AcceptAllFilter());
        try {
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class, (Object) null);
            valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
            ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
            createKeyedBackend.setCurrentKey(1);
            partitionedState.update("Hello");
            Assert.assertTrue(listFilesAndDirs.size() > 1);
            IOUtils.closeQuietly(createKeyedBackend);
            createKeyedBackend.dispose();
            Assert.assertEquals(1L, FileUtils.listFilesAndDirs(new File(this.dbPath), new AcceptAllFilter(), new AcceptAllFilter()).size());
        } catch (Throwable th) {
            IOUtils.closeQuietly(createKeyedBackend);
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testSharedIncrementalStateDeRegistration() throws Exception {
        if (this.enableIncrementalCheckpointing) {
            CheckpointListener createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
            try {
                ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class, (Object) null);
                valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
                ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
                LinkedList linkedList = new LinkedList();
                SharedStateRegistry sharedStateRegistry = (SharedStateRegistry) PowerMockito.spy(new SharedStateRegistry());
                for (int i = 0; i < 3; i++) {
                    Mockito.reset(new SharedStateRegistry[]{sharedStateRegistry});
                    createKeyedBackend.setCurrentKey(Integer.valueOf(i));
                    partitionedState.update("Hello-" + i);
                    RunnableFuture snapshot = createKeyedBackend.snapshot(i, i, createStreamFactory(), CheckpointOptions.forCheckpointWithDefaultLocation());
                    snapshot.run();
                    IncrementalRemoteKeyedStateHandle jobManagerOwnedSnapshot = ((SnapshotResult) snapshot.get()).getJobManagerOwnedSnapshot();
                    HashMap hashMap = new HashMap(jobManagerOwnedSnapshot.getSharedState());
                    jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry);
                    for (Map.Entry entry : hashMap.entrySet()) {
                        ((SharedStateRegistry) Mockito.verify(sharedStateRegistry)).registerReference(jobManagerOwnedSnapshot.createSharedStateRegistryKeyFromFileName((StateHandleID) entry.getKey()), (StreamStateHandle) entry.getValue());
                    }
                    linkedList.add(jobManagerOwnedSnapshot);
                    createKeyedBackend.notifyCheckpointComplete(i);
                    if (linkedList.size() > 1) {
                        checkRemove((IncrementalRemoteKeyedStateHandle) linkedList.remove(), sharedStateRegistry);
                    }
                }
                while (!linkedList.isEmpty()) {
                    Mockito.reset(new SharedStateRegistry[]{sharedStateRegistry});
                    checkRemove((IncrementalRemoteKeyedStateHandle) linkedList.remove(), sharedStateRegistry);
                }
            } finally {
                IOUtils.closeQuietly(createKeyedBackend);
                createKeyedBackend.dispose();
            }
        }
    }

    private void checkRemove(IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle, SharedStateRegistry sharedStateRegistry) throws Exception {
        Iterator it = incrementalRemoteKeyedStateHandle.getSharedState().keySet().iterator();
        while (it.hasNext()) {
            ((SharedStateRegistry) Mockito.verify(sharedStateRegistry, VerificationModeFactory.times(0))).unregisterReference(incrementalRemoteKeyedStateHandle.createSharedStateRegistryKeyFromFileName((StateHandleID) it.next()));
        }
        incrementalRemoteKeyedStateHandle.discardState();
        Iterator it2 = incrementalRemoteKeyedStateHandle.getSharedState().keySet().iterator();
        while (it2.hasNext()) {
            ((SharedStateRegistry) Mockito.verify(sharedStateRegistry)).unregisterReference(incrementalRemoteKeyedStateHandle.createSharedStateRegistryKeyFromFileName((StateHandleID) it2.next()));
        }
    }

    private void runStateUpdates() throws Exception {
        for (int i = 50; i < 150; i++) {
            if (i % 10 == 0) {
                Thread.sleep(1L);
            }
            this.keyedStateBackend.setCurrentKey(Integer.valueOf(i));
            this.testState1.update(Integer.valueOf(4200 + i));
            this.testState2.update("S-" + (4200 + i));
        }
    }

    private void verifyRocksObjectsReleased() {
        Iterator<RocksObject> it = this.allCreatedCloseables.iterator();
        while (it.hasNext()) {
            ((RocksObject) Mockito.verify(it.next(), VerificationModeFactory.times(1))).close();
        }
        TestCase.assertNotNull((String) null, this.keyedStateBackend.db);
        RocksDB rocksDB = this.keyedStateBackend.db;
        if (!this.enableIncrementalCheckpointing) {
            ((RocksDB) Mockito.verify(rocksDB, VerificationModeFactory.times(1))).getSnapshot();
            ((RocksDB) Mockito.verify(rocksDB, VerificationModeFactory.times(1))).releaseSnapshot((Snapshot) Matchers.any(Snapshot.class));
        }
        this.keyedStateBackend.dispose();
        ((RocksDB) Mockito.verify(rocksDB, VerificationModeFactory.times(1))).close();
        Assert.assertEquals(true, Boolean.valueOf(this.keyedStateBackend.isDisposed()));
    }

    private void verifyRocksDBStateUploaderClosed() {
        if (this.enableIncrementalCheckpointing) {
            ((RocksDBStateUploader) Mockito.verify(this.rocksDBStateUploader, VerificationModeFactory.times(1))).close();
        }
    }
}
