/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PrimitiveIterator;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateSnapshotTransformerTest;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.TestableKeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.AbstractHeapState;
import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
import org.apache.flink.runtime.state.heap.NestedStateMap;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.testutils.statemigration.TestType;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public abstract class StateBackendTestBase<B extends AbstractStateBackend>
extends TestLogger {
    @Rule
    public final ExpectedException expectedException = ExpectedException.none();
    private CheckpointStreamFactory checkpointStreamFactory;
    private MockEnvironment env;

    @Before
    public void before() throws IOException {
        this.env = this.buildMockEnv();
    }

    @After
    public void after() {
        IOUtils.closeQuietly((AutoCloseable)this.env);
    }

    protected abstract ConfigurableStateBackend getStateBackend() throws Exception;

    protected CheckpointStorage getCheckpointStorage() throws Exception {
        ConfigurableStateBackend stateBackend = this.getStateBackend();
        if (stateBackend instanceof CheckpointStorage) {
            return (CheckpointStorage)stateBackend;
        }
        throw new IllegalStateException("The state backend under test does not implement CheckpointStorage.Please override 'createCheckpointStorage' and provide an appropriatecheckpoint storage instance");
    }

    protected abstract boolean isSerializerPresenceRequiredOnRestore();

    protected abstract boolean supportsAsynchronousSnapshots();

    protected CheckpointStreamFactory createStreamFactory() throws Exception {
        if (this.checkpointStreamFactory == null) {
            this.checkpointStreamFactory = this.getCheckpointStorage().createCheckpointStorage(new JobID()).resolveCheckpointStorageLocation(1L, CheckpointStorageLocationReference.getDefault());
        }
        return this.checkpointStreamFactory;
    }

    protected <K> CheckpointableKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer) throws Exception {
        return this.createKeyedBackend(keySerializer, this.env);
    }

    protected <K> CheckpointableKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, Environment env) throws Exception {
        return this.createKeyedBackend(keySerializer, 10, new KeyGroupRange(0, 9), env);
    }

    protected <K> CheckpointableKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, Environment env) throws Exception {
        CheckpointableKeyedStateBackend backend = this.getStateBackend().createKeyedStateBackend(env, new JobID(), "test_op", keySerializer, numberOfKeyGroups, keyGroupRange, env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, (MetricGroup)new UnregisteredMetricsGroup(), Collections.emptyList(), new CloseableRegistry());
        return backend;
    }

    protected <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state) throws Exception {
        return this.restoreKeyedBackend(keySerializer, state, this.env);
    }

    protected <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state, Environment env) throws Exception {
        return this.restoreKeyedBackend(keySerializer, 10, new KeyGroupRange(0, 9), Collections.singletonList(state), env);
    }

    protected <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, List<KeyedStateHandle> state, Environment env) throws Exception {
        return this.getStateBackend().createKeyedStateBackend(env, new JobID(), "test_op", keySerializer, numberOfKeyGroups, keyGroupRange, env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, (MetricGroup)new UnregisteredMetricsGroup(), state, new CloseableRegistry());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEnableStateLatencyTracking() throws Exception {
        ConfigurableStateBackend stateBackend = this.getStateBackend();
        Configuration config = new Configuration();
        config.setBoolean(StateBackendOptions.LATENCY_TRACK_ENABLED, true);
        StateBackend configuredBackend = stateBackend.configure((ReadableConfig)config, Thread.currentThread().getContextClassLoader());
        KeyGroupRange groupRange = new KeyGroupRange(0, 1);
        CheckpointableKeyedStateBackend keyedStateBackend = configuredBackend.createKeyedStateBackend((Environment)this.env, new JobID(), "test_op", (TypeSerializer)IntSerializer.INSTANCE, groupRange.getNumberOfKeyGroups(), groupRange, this.env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, (MetricGroup)new UnregisteredMetricsGroup(), Collections.emptyList(), new CloseableRegistry());
        try {
            CheckpointableKeyedStateBackend nested = keyedStateBackend instanceof TestableKeyedStateBackend ? ((TestableKeyedStateBackend)keyedStateBackend).getDelegatedKeyedStateBackend(true) : keyedStateBackend;
            Assert.assertTrue((boolean)((AbstractKeyedStateBackend)nested).getLatencyTrackingStateConfig().isEnabled());
        }
        finally {
            IOUtils.closeQuietly((AutoCloseable)keyedStateBackend);
            keyedStateBackend.dispose();
        }
    }

    @Test
    public void testKeyGroupedInternalPriorityQueue() throws Exception {
        this.testKeyGroupedInternalPriorityQueue(false);
    }

    @Test
    public void testKeyGroupedInternalPriorityQueueAddAll() throws Exception {
        this.testKeyGroupedInternalPriorityQueue(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testKeyGroupedInternalPriorityQueue(boolean addAll) throws Exception {
        String fieldName = "key-grouped-priority-queue";
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            KeyGroupedInternalPriorityQueue priorityQueue = backend.create(fieldName, (TypeSerializer)new TestType.V1TestTypeSerializer());
            TestType elementA42 = new TestType("a", 42);
            TestType elementA44 = new TestType("a", 44);
            TestType elementB1 = new TestType("b", 1);
            TestType elementB3 = new TestType("b", 3);
            TestType[] elements = new TestType[]{elementA44, elementB1, elementB1, elementB3, elementA42};
            if (addAll) {
                priorityQueue.addAll(Arrays.asList(elements));
            } else {
                Assert.assertTrue((boolean)priorityQueue.add((Object)elements[0]));
                Assert.assertTrue((boolean)priorityQueue.add((Object)elements[1]));
                Assert.assertFalse((boolean)priorityQueue.add((Object)elements[2]));
                Assert.assertFalse((boolean)priorityQueue.add((Object)elements[3]));
                Assert.assertFalse((boolean)priorityQueue.add((Object)elements[4]));
            }
            Assert.assertFalse((boolean)priorityQueue.isEmpty());
            Assert.assertThat((Object)priorityQueue.getSubsetForKeyGroup(1), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new TestType[]{elementA42, elementA44}));
            Assert.assertThat((Object)priorityQueue.getSubsetForKeyGroup(8), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new TestType[]{elementB1, elementB3}));
            Assert.assertThat((Object)priorityQueue.peek(), (Matcher)org.hamcrest.Matchers.equalTo((Object)elementB1));
            Assert.assertThat((Object)priorityQueue.poll(), (Matcher)org.hamcrest.Matchers.equalTo((Object)elementB1));
            Assert.assertThat((Object)priorityQueue.peek(), (Matcher)org.hamcrest.Matchers.equalTo((Object)elementB3));
            ArrayList actualList = new ArrayList();
            try (CloseableIterator iterator = priorityQueue.iterator();){
                iterator.forEachRemaining(actualList::add);
            }
            Assert.assertThat(actualList, (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new TestType[]{elementB3, elementA42, elementA44}));
            Assert.assertEquals((long)3L, (long)priorityQueue.size());
            Assert.assertFalse((boolean)priorityQueue.remove((Object)elementB1));
            Assert.assertTrue((boolean)priorityQueue.remove((Object)elementB3));
            Assert.assertThat((Object)priorityQueue.peek(), (Matcher)org.hamcrest.Matchers.equalTo((Object)elementA42));
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGetKeys() throws Exception {
        int namespace1ElementsNum = 1000;
        int namespace2ElementsNum = 1000;
        String fieldName = "get-keys-test";
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            int expectedKey;
            PrimitiveIterator.OfInt actualIterator;
            String ns1 = "ns1";
            ValueState keyedState1 = (ValueState)backend.getPartitionedState((Object)"ns1", (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor(fieldName, (TypeSerializer)IntSerializer.INSTANCE));
            for (int key = 0; key < 1000; ++key) {
                backend.setCurrentKey((Object)key);
                keyedState1.update((Object)(key * 2));
            }
            String ns2 = "ns2";
            ValueState keyedState2 = (ValueState)backend.getPartitionedState((Object)"ns2", (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor(fieldName, (TypeSerializer)IntSerializer.INSTANCE));
            for (int key = 1000; key < 2000; ++key) {
                backend.setCurrentKey((Object)key);
                keyedState2.update((Object)(key * 2));
            }
            try (Stream<Integer> keysStream = backend.getKeys(fieldName, (Object)"ns1").sorted();){
                actualIterator = keysStream.mapToInt(value -> value).iterator();
                for (expectedKey = 0; expectedKey < 1000; ++expectedKey) {
                    Assert.assertTrue((boolean)actualIterator.hasNext());
                    Assert.assertEquals((long)expectedKey, (long)actualIterator.nextInt());
                }
                Assert.assertFalse((boolean)actualIterator.hasNext());
            }
            keysStream = backend.getKeys(fieldName, (Object)"ns2").sorted();
            var10_12 = null;
            try {
                actualIterator = keysStream.mapToInt(value -> value).iterator();
                for (expectedKey = 1000; expectedKey < 2000; ++expectedKey) {
                    Assert.assertTrue((boolean)actualIterator.hasNext());
                    Assert.assertEquals((long)expectedKey, (long)actualIterator.nextInt());
                }
                Assert.assertFalse((boolean)actualIterator.hasNext());
            }
            catch (Throwable throwable) {
                var10_12 = throwable;
                throw throwable;
            }
            finally {
                if (keysStream != null) {
                    if (var10_12 != null) {
                        try {
                            keysStream.close();
                        }
                        catch (Throwable throwable) {
                            var10_12.addSuppressed(throwable);
                        }
                    } else {
                        keysStream.close();
                    }
                }
            }
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGetKeysAndNamespaces() throws Exception {
        int elementsNum = 1000;
        String fieldName = "get-keys-test";
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            String ns1 = "ns1";
            ValueState keyedState1 = (ValueState)backend.getPartitionedState((Object)"ns1", (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor(fieldName, (TypeSerializer)IntSerializer.INSTANCE));
            String ns2 = "ns2";
            ValueState keyedState2 = (ValueState)backend.getPartitionedState((Object)"ns2", (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor(fieldName, (TypeSerializer)IntSerializer.INSTANCE));
            for (int key = 0; key < 1000; ++key) {
                backend.setCurrentKey((Object)key);
                keyedState1.update((Object)(key * 2));
                keyedState2.update((Object)(key * 2));
            }
            try (Stream stream = backend.getKeysAndNamespaces(fieldName);){
                HashMap keysByNamespace = new HashMap();
                stream.forEach(entry -> {
                    Assert.assertThat((String)"Unexpected namespace", (Object)entry.f1, (Matcher)org.hamcrest.Matchers.isOneOf((Object[])new String[]{"ns1", "ns2"}));
                    Assert.assertThat((String)"Unexpected key", (Object)entry.f0, (Matcher)Is.is((Matcher)org.hamcrest.Matchers.both((Matcher)org.hamcrest.Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(0))).and(org.hamcrest.Matchers.lessThan((Comparable)Integer.valueOf(1000)))));
                    Set keys = keysByNamespace.computeIfAbsent(entry.f1, k -> new HashSet());
                    Assert.assertTrue((String)"Duplicate key for namespace", (boolean)keys.add(entry.f0));
                });
            }
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            this.env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
            GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
            Assert.assertTrue((boolean)(pojoType.createSerializer(this.env.getExecutionConfig()) instanceof KryoSerializer));
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            int numExceptions = 0;
            backend.setCurrentKey((Object)1);
            try {
                state.update((Object)new TestPojo("u1", 1));
            }
            catch (ExpectedKryoTestException e) {
                ++numExceptions;
            }
            catch (Exception e) {
                if (e.getCause() instanceof ExpectedKryoTestException) {
                    ++numExceptions;
                }
                throw e;
            }
            try {
                this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            }
            catch (ExpectedKryoTestException e) {
                ++numExceptions;
            }
            catch (Exception e) {
                if (e.getCause() instanceof ExpectedKryoTestException) {
                    ++numExceptions;
                }
                throw e;
            }
            Assert.assertTrue((String)"Didn't see the expected Kryo exception.", (numExceptions > 0 ? 1 : 0) != 0);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            this.env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
            GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
            Assert.assertTrue((boolean)(pojoType.createSerializer(this.env.getExecutionConfig()) instanceof KryoSerializer));
            pojoType.createSerializer(this.env.getExecutionConfig());
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            ValueState state = (ValueState)backend.getOrCreateKeyedState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            Assert.assertTrue((boolean)(state instanceof InternalValueState));
            ((InternalValueState)state).setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            int numExceptions = 0;
            backend.setCurrentKey((Object)1);
            try {
                state.update((Object)new TestPojo("u1", 1));
            }
            catch (ExpectedKryoTestException e) {
                ++numExceptions;
            }
            catch (Exception e) {
                if (e.getCause() instanceof ExpectedKryoTestException) {
                    ++numExceptions;
                }
                throw e;
            }
            try {
                this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            }
            catch (ExpectedKryoTestException e) {
                ++numExceptions;
            }
            catch (Exception e) {
                if (e.getCause() instanceof ExpectedKryoTestException) {
                    ++numExceptions;
                }
                throw e;
            }
            Assert.assertTrue((String)"Didn't see the expected Kryo exception.", (numExceptions > 0 ? 1 : 0) != 0);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBackendUsesRegisteredKryoSerializer() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        this.env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
        GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue((boolean)(pojoType.createSerializer(this.env.getExecutionConfig()) instanceof KryoSerializer));
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            int numExceptions = 0;
            backend.setCurrentKey((Object)1);
            try {
                state.update((Object)new TestPojo("u1", 1));
            }
            catch (ExpectedKryoTestException e) {
                ++numExceptions;
            }
            catch (Exception e) {
                if (e.getCause() instanceof ExpectedKryoTestException) {
                    ++numExceptions;
                }
                throw e;
            }
            try {
                this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            }
            catch (ExpectedKryoTestException e) {
                ++numExceptions;
            }
            catch (Exception e) {
                if (e.getCause() instanceof ExpectedKryoTestException) {
                    ++numExceptions;
                }
                throw e;
            }
            Assert.assertTrue((String)"Didn't see the expected Kryo exception.", (numExceptions > 0 ? 1 : 0) != 0);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        this.env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
        GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue((boolean)(pojoType.createSerializer(this.env.getExecutionConfig()) instanceof KryoSerializer));
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            ValueState state = (ValueState)backend.getOrCreateKeyedState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            Assert.assertTrue((boolean)(state instanceof InternalValueState));
            ((InternalValueState)state).setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            int numExceptions = 0;
            backend.setCurrentKey((Object)1);
            try {
                state.update((Object)new TestPojo("u1", 1));
            }
            catch (ExpectedKryoTestException e) {
                ++numExceptions;
            }
            catch (Exception e) {
                if (e.getCause() instanceof ExpectedKryoTestException) {
                    ++numExceptions;
                }
                throw e;
            }
            try {
                this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            }
            catch (ExpectedKryoTestException e) {
                ++numExceptions;
            }
            catch (Exception e) {
                if (e.getCause() instanceof ExpectedKryoTestException) {
                    ++numExceptions;
                }
                throw e;
            }
            Assert.assertTrue((String)"Didn't see the expected Kryo exception.", (numExceptions > 0 ? 1 : 0) != 0);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKryoRegisteringRestoreResilienceWithRegisteredType() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue((boolean)(pojoType.createSerializer(this.env.getExecutionConfig()) instanceof KryoSerializer));
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 1));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestPojo("u2", 2));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            IOUtils.closeQuietly(backend);
            backend.dispose();
            this.env.getExecutionConfig().registerKryoType(TestPojo.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, this.env);
            snapshot.discardState();
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)state.value(), (Object)new TestPojo("u1", 1));
            backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)state.value(), (Object)new TestPojo("u2", 2));
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKryoRegisteringRestoreResilienceWithDefaultSerializer() throws Exception {
        Assume.assumeTrue((boolean)this.supportsMetaInfoVerification());
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
            Assert.assertTrue((boolean)(pojoType.createSerializer(this.env.getExecutionConfig()) instanceof KryoSerializer));
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 1));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestPojo("u2", 2));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            this.env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, this.env);
            kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 11));
            KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            snapshot.discardState();
            backend.dispose();
            this.env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            this.expectedException.expect((Matcher)CoreMatchers.anyOf((Matcher)CoreMatchers.isA(ExpectedKryoTestException.class), (Matcher)org.hamcrest.Matchers.hasProperty((String)"cause", (Matcher)CoreMatchers.isA(ExpectedKryoTestException.class))));
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2, this.env);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.value();
            snapshot2.discardState();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throws Exception {
        Assume.assumeTrue((boolean)this.supportsMetaInfoVerification());
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        CheckpointableKeyedStateBackend backend = null;
        try {
            backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
            GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
            Assert.assertTrue((boolean)(pojoType.createSerializer(this.env.getExecutionConfig()) instanceof KryoSerializer));
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 1));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestPojo("u2", 2));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            this.env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, this.env);
            kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 11));
            KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            snapshot.discardState();
            backend.dispose();
            this.env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            this.expectedException.expect((Matcher)CoreMatchers.anyOf((Matcher)CoreMatchers.isA(ExpectedKryoTestException.class), (Matcher)org.hamcrest.Matchers.hasProperty((String)"cause", (Matcher)CoreMatchers.isA(ExpectedKryoTestException.class))));
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2, this.env);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.value();
        }
        finally {
            if (backend != null) {
                backend.dispose();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKryoRestoreResilienceWithDifferentRegistrationOrder() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        this.env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class);
        this.env.getExecutionConfig().registerKryoType(TestNestedPojoClassB.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
            Assert.assertTrue((boolean)(pojoType.createSerializer(this.env.getExecutionConfig()) instanceof KryoSerializer));
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState internalKvState = (InternalKvState)state;
            KryoSerializer kryoSerializer = (KryoSerializer)internalKvState.getValueSerializer();
            int mainPojoClassRegistrationId = kryoSerializer.getKryo().getRegistration(TestPojo.class).getId();
            int nestedPojoClassARegistrationId = kryoSerializer.getKryo().getRegistration(TestNestedPojoClassA.class).getId();
            int nestedPojoClassBRegistrationId = kryoSerializer.getKryo().getRegistration(TestNestedPojoClassB.class).getId();
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 1, new TestNestedPojoClassA(1.0, 2), new TestNestedPojoClassB(2.3, "foo")));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestPojo("u2", 2, new TestNestedPojoClassA(2.0, 5), new TestNestedPojoClassB(3.1, "bar")));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            this.env.close();
            this.env = this.buildMockEnv();
            this.env.getExecutionConfig().registerKryoType(TestNestedPojoClassB.class);
            this.env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, this.env);
            kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            internalKvState = (InternalKvState)state;
            kryoSerializer = (KryoSerializer)internalKvState.getValueSerializer();
            Assert.assertEquals((long)mainPojoClassRegistrationId, (long)kryoSerializer.getKryo().getRegistration(TestPojo.class).getId());
            Assert.assertEquals((long)nestedPojoClassARegistrationId, (long)kryoSerializer.getKryo().getRegistration(TestNestedPojoClassA.class).getId());
            Assert.assertEquals((long)nestedPojoClassBRegistrationId, (long)kryoSerializer.getKryo().getRegistration(TestNestedPojoClassB.class).getId());
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 11, new TestNestedPojoClassA(22.1, 12), new TestNestedPojoClassB(1.23, "foobar")));
            this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            snapshot.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPojoRestoreResilienceWithDifferentRegistrationOrder() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        this.env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
        this.env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            TypeInformation pojoType = TypeExtractor.getForClass(TestPojo.class);
            Assert.assertTrue((boolean)(pojoType.createSerializer(this.env.getExecutionConfig()) instanceof PojoSerializer));
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", pojoType);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 1, new TestNestedPojoClassA(1.0, 2), new TestNestedPojoClassB(2.3, "foo")));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestPojo("u2", 2, new TestNestedPojoClassA(2.0, 5), new TestNestedPojoClassB(3.1, "bar")));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            this.env.close();
            this.env = this.buildMockEnv();
            this.env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class);
            this.env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, this.env);
            kvId = new ValueStateDescriptor("id", pojoType);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 11, new TestNestedPojoClassA(22.1, 12), new TestNestedPojoClassB(1.23, "foobar")));
            this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            snapshot.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testValueState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState kvState = (InternalKvState)state;
            TypeSerializer valueSerializer = kvId.getSerializer();
            backend.setCurrentKey((Object)1);
            Assert.assertNull((Object)state.value());
            Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            state.update((Object)"1");
            backend.setCurrentKey((Object)2);
            Assert.assertNull((Object)state.value());
            Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            state.update((Object)"2");
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1", (Object)state.value());
            Assert.assertEquals((Object)"1", StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.setCurrentKey((Object)1);
            state.update((Object)"u1");
            backend.setCurrentKey((Object)2);
            state.update((Object)"u2");
            backend.setCurrentKey((Object)3);
            state.update((Object)"u3");
            KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"u1", (Object)state.value());
            Assert.assertEquals((Object)"u1", StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"u2", (Object)state.value());
            Assert.assertEquals((Object)"u2", StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            backend.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"u3", (Object)state.value());
            Assert.assertEquals((Object)"u3", StateBackendTestBase.getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            ValueState restored1 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState restoredKvState1 = (InternalKvState)restored1;
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1", (Object)restored1.value());
            Assert.assertEquals((Object)"1", StateBackendTestBase.getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2", (Object)restored1.value());
            Assert.assertEquals((Object)"2", StateBackendTestBase.getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2);
            snapshot2.discardState();
            ValueState restored2 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState restoredKvState2 = (InternalKvState)restored2;
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"u1", (Object)restored2.value());
            Assert.assertEquals((Object)"u1", StateBackendTestBase.getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"u2", (Object)restored2.value());
            Assert.assertEquals((Object)"u2", StateBackendTestBase.getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            backend.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"u3", (Object)restored2.value());
            Assert.assertEquals((Object)"u3", StateBackendTestBase.getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testValueStateWorkWithTtl() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", MutableLong.class);
            kvId.enableTimeToLive(StateTtlConfig.newBuilder((Time)Time.seconds((long)1L)).build());
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new MutableLong());
            state.value();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testValueStateRace() throws Exception {
        Integer namespace = 1;
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        IntSerializer namespaceSerializer = IntSerializer.INSTANCE;
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            final ValueState state = (ValueState)backend.getPartitionedState((Object)namespace, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)kvId);
            TypeSerializer valueSerializer = kvId.getSerializer();
            final InternalKvState kvState = (InternalKvState)state;
            boolean key1 = true;
            backend.setCurrentKey((Object)1);
            kvState.setCurrentNamespace((Object)2);
            state.update((Object)"2");
            Assert.assertEquals((Object)"2", (Object)state.value());
            Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 3, keySerializer, namespace, IntSerializer.INSTANCE, valueSerializer));
            Assert.assertEquals((Object)"2", (Object)state.value());
            kvState.setCurrentNamespace((Object)namespace);
            int key2 = 10;
            backend.setCurrentKey((Object)10);
            Assert.assertNull((Object)state.value());
            Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 10, keySerializer, namespace, namespaceSerializer, valueSerializer));
            state.update((Object)"1");
            final CheckedThread getter = new CheckedThread("State getter"){

                public void go() throws Exception {
                    while (!this.isInterrupted()) {
                        Assert.assertEquals((Object)"1", (Object)state.value());
                    }
                }
            };
            final CheckedThread serializedGetter = new CheckedThread("Serialized state getter", (TypeSerializer)keySerializer, namespace, (TypeSerializer)namespaceSerializer, valueSerializer){
                final /* synthetic */ TypeSerializer val$keySerializer;
                final /* synthetic */ Integer val$namespace;
                final /* synthetic */ TypeSerializer val$namespaceSerializer;
                final /* synthetic */ TypeSerializer val$valueSerializer;
                {
                    this.val$keySerializer = typeSerializer;
                    this.val$namespace = n;
                    this.val$namespaceSerializer = typeSerializer2;
                    this.val$valueSerializer = typeSerializer3;
                    super(x0);
                }

                public void go() throws Exception {
                    while (!this.isInterrupted() && getter.isAlive()) {
                        String serializedValue = (String)StateBackendTestBase.getSerializedValue(kvState, 10, this.val$keySerializer, this.val$namespace, this.val$namespaceSerializer, this.val$valueSerializer);
                        Assert.assertEquals((Object)"1", (Object)serializedValue);
                    }
                }
            };
            getter.start();
            serializedGetter.start();
            Timer t = new Timer("stopper");
            t.schedule(new TimerTask(){

                @Override
                public void run() {
                    getter.interrupt();
                    serializedGetter.interrupt();
                    this.cancel();
                }
            }, 100L);
            serializedGetter.sync();
            getter.interrupt();
            getter.sync();
            t.cancel();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultipleValueStates() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        ValueStateDescriptor desc1 = new ValueStateDescriptor("a-string", (TypeSerializer)StringSerializer.INSTANCE);
        ValueStateDescriptor desc2 = new ValueStateDescriptor("an-integer", (TypeSerializer)IntSerializer.INSTANCE);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), this.env);
        try {
            ValueState state1 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc1);
            ValueState state2 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc2);
            backend.setCurrentKey((Object)1);
            Assert.assertNull((Object)state1.value());
            Assert.assertNull((Object)state2.value());
            state1.update((Object)"1");
            Assert.assertEquals((Object)"1", (Object)state1.value());
            Assert.assertNull((Object)state2.value());
            state2.update((Object)13);
            Assert.assertEquals((Object)"1", (Object)state1.value());
            Assert.assertEquals((long)13L, (long)((Integer)state2.value()).intValue());
            KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), Collections.singletonList(snapshot1), this.env);
            snapshot1.discardState();
            backend.setCurrentKey((Object)1);
            state1 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc1);
            state2 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc2);
            Assert.assertEquals((Object)"1", (Object)state1.value());
            Assert.assertEquals((long)13L, (long)((Integer)state2.value()).intValue());
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testValueStateNullUpdate() throws Exception {
        try {
            LongSerializer.INSTANCE.serialize(null, (DataOutputView)new DataOutputViewStreamWrapper((OutputStream)new ByteArrayOutputStream()));
            Assert.fail((String)"Should fail with NullPointerException");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeSerializer)LongSerializer.INSTANCE, (Object)42L);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((long)42L, (long)((Long)state.value()));
            state.update((Object)1L);
            Assert.assertEquals((long)1L, (long)((Long)state.value()));
            backend.setCurrentKey((Object)2);
            Assert.assertEquals((long)42L, (long)((Long)state.value()));
            backend.setCurrentKey((Object)1);
            state.clear();
            Assert.assertEquals((long)42L, (long)((Long)state.value()));
            state.update((Object)17L);
            Assert.assertEquals((long)17L, (long)((Long)state.value()));
            state.update(null);
            Assert.assertEquals((long)42L, (long)((Long)state.value()));
            KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListState state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState kvState = (InternalKvState)state;
            TypeSerializer valueSerializer = kvId.getElementSerializer();
            Joiner joiner = Joiner.on((String)",");
            backend.setCurrentKey((Object)1);
            Assert.assertNull((Object)state.get());
            Assert.assertNull(StateBackendTestBase.getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            state.add((Object)"1");
            backend.setCurrentKey((Object)2);
            Assert.assertNull((Object)state.get());
            Assert.assertNull(StateBackendTestBase.getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            state.update(Arrays.asList("2"));
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1", (Object)joiner.join((Iterable)state.get()));
            Assert.assertEquals((Object)"1", (Object)joiner.join(StateBackendTestBase.getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
            KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.setCurrentKey((Object)1);
            state.add((Object)"u1");
            backend.setCurrentKey((Object)2);
            state.add((Object)"u2");
            backend.setCurrentKey((Object)3);
            state.add((Object)"u3");
            KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1,u1", (Object)joiner.join((Iterable)state.get()));
            Assert.assertEquals((Object)"1,u1", (Object)joiner.join(StateBackendTestBase.getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
            backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2,u2", (Object)joiner.join((Iterable)state.get()));
            Assert.assertEquals((Object)"2,u2", (Object)joiner.join(StateBackendTestBase.getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
            backend.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"u3", (Object)joiner.join((Iterable)state.get()));
            Assert.assertEquals((Object)"u3", (Object)joiner.join(StateBackendTestBase.getSerializedList(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            ListState restored1 = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState restoredKvState1 = (InternalKvState)restored1;
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1", (Object)joiner.join((Iterable)restored1.get()));
            Assert.assertEquals((Object)"1", (Object)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
            backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2", (Object)joiner.join((Iterable)restored1.get()));
            Assert.assertEquals((Object)"2", (Object)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2);
            snapshot2.discardState();
            ListState restored2 = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState restoredKvState2 = (InternalKvState)restored2;
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1,u1", (Object)joiner.join((Iterable)restored2.get()));
            Assert.assertEquals((Object)"1,u1", (Object)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
            backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2,u2", (Object)joiner.join((Iterable)restored2.get()));
            Assert.assertEquals((Object)"2,u2", (Object)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
            backend.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"u3", (Object)joiner.join((Iterable)restored2.get()));
            Assert.assertEquals((Object)"u3", (Object)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateAddNull() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            this.expectedException.expect(NullPointerException.class);
            state.add(null);
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateAddAllNullEntries() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            this.expectedException.expect(NullPointerException.class);
            ArrayList<Long> adding = new ArrayList<Long>();
            adding.add(3L);
            adding.add(null);
            adding.add(5L);
            state.addAll(adding);
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateAddAllNull() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            this.expectedException.expect(NullPointerException.class);
            state.addAll(null);
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateUpdateNullEntries() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            this.expectedException.expect(NullPointerException.class);
            ArrayList<Long> adding = new ArrayList<Long>();
            adding.add(3L);
            adding.add(null);
            adding.add(5L);
            state.update(adding);
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateUpdateNull() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            this.expectedException.expect(NullPointerException.class);
            state.update(null);
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateAPIs() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            state.add((Object)17L);
            state.add((Object)11L);
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{17L, 11L}));
            state.update(Collections.emptyList());
            Assert.assertNull((Object)state.get());
            state.update(Arrays.asList(10L, 16L));
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{16L, 10L}));
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{16L, 10L}));
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertNull((Object)state.get());
            Assert.assertNull((Object)state.get());
            state.addAll(Collections.emptyList());
            Assert.assertNull((Object)state.get());
            state.addAll(Arrays.asList(3L, 4L));
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L}));
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L}));
            state.addAll(new ArrayList());
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L}));
            state.addAll(Arrays.asList(5L, 6L));
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L, 5L, 6L}));
            state.addAll(new ArrayList());
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L, 5L, 6L}));
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L, 5L, 6L}));
            state.update(Arrays.asList(1L, 2L));
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{1L, 2L}));
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{10L, 16L}));
            state.clear();
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{1L, 2L, 3L, 2L, 1L}));
            state.update(Arrays.asList(5L, 6L));
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{5L, 6L}));
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)((TestableKeyedStateBackend)keyedBackend).numKeyValueStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateMerging() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            InternalListState state = (InternalListState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L}));
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L}));
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L}));
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L}));
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)((TestableKeyedStateBackend)keyedBackend).numKeyValueStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReducingState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ReducingState state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState kvState = (InternalKvState)state;
            TypeSerializer valueSerializer = kvId.getSerializer();
            backend.setCurrentKey((Object)1);
            Assert.assertNull((Object)state.get());
            Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            state.add((Object)"1");
            backend.setCurrentKey((Object)2);
            Assert.assertNull((Object)state.get());
            Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            state.add((Object)"2");
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1", (Object)state.get());
            Assert.assertEquals((Object)"1", StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.setCurrentKey((Object)1);
            state.add((Object)"u1");
            backend.setCurrentKey((Object)2);
            state.add((Object)"u2");
            backend.setCurrentKey((Object)3);
            state.add((Object)"u3");
            KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1,u1", (Object)state.get());
            Assert.assertEquals((Object)"1,u1", StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2,u2", (Object)state.get());
            Assert.assertEquals((Object)"2,u2", StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            backend.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"u3", (Object)state.get());
            Assert.assertEquals((Object)"u3", StateBackendTestBase.getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            ReducingState restored1 = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState restoredKvState1 = (InternalKvState)restored1;
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1", (Object)restored1.get());
            Assert.assertEquals((Object)"1", StateBackendTestBase.getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2", (Object)restored1.get());
            Assert.assertEquals((Object)"2", StateBackendTestBase.getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2);
            snapshot2.discardState();
            ReducingState restored2 = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState restoredKvState2 = (InternalKvState)restored2;
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"1,u1", (Object)restored2.get());
            Assert.assertEquals((Object)"1,u1", StateBackendTestBase.getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)"2,u2", (Object)restored2.get());
            Assert.assertEquals((Object)"2,u2", StateBackendTestBase.getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
            backend.setCurrentKey((Object)3);
            Assert.assertEquals((Object)"u3", (Object)restored2.get());
            Assert.assertEquals((Object)"u3", StateBackendTestBase.getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReducingStateAddAndGet() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", (ReduceFunction & Serializable)(a, b) -> a + b, Long.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            ReducingState state = (ReducingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            state.add((Object)17L);
            state.add((Object)11L);
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertNull((Object)state.get());
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            state.clear();
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertEquals((long)9L, (long)((Long)state.get()));
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)((TestableKeyedStateBackend)keyedBackend).numKeyValueStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReducingStateMerging() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", (ReduceFunction & Serializable)(a, b) -> a + b, Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            InternalReducingState state = (InternalReducingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)((TestableKeyedStateBackend)keyedBackend).numKeyValueStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAggregatingStateAddAndGetWithMutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            AggregatingState state = (AggregatingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            state.add((Object)17L);
            state.add((Object)11L);
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertNull((Object)state.get());
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            state.clear();
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertEquals((long)9L, (long)((Long)state.get()));
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)((TestableKeyedStateBackend)keyedBackend).numKeyValueStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAggregatingStateMergingWithMutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)((TestableKeyedStateBackend)keyedBackend).numKeyValueStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAggregatingStateAddAndGetWithImmutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            AggregatingState state = (AggregatingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            state.add((Object)17L);
            state.add((Object)11L);
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertNull((Object)state.get());
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            state.clear();
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertEquals((long)9L, (long)((Long)state.get()));
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)((TestableKeyedStateBackend)keyedBackend).numKeyValueStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAggregatingStateMergingWithImmutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)((TestableKeyedStateBackend)keyedBackend).numKeyValueStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMapState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        MapStateDescriptor kvId = new MapStateDescriptor("id", Integer.class, String.class);
        StringSerializer keySerializer = StringSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState kvState = (InternalKvState)state;
            TypeSerializer userKeySerializer = kvId.getKeySerializer();
            TypeSerializer userValueSerializer = kvId.getValueSerializer();
            backend.setCurrentKey((Object)"1");
            Assert.assertNull((Object)state.get((Object)1));
            Assert.assertNull(StateBackendTestBase.getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
            state.put((Object)1, (Object)"1");
            backend.setCurrentKey((Object)"2");
            Assert.assertNull((Object)state.get((Object)2));
            Assert.assertNull(StateBackendTestBase.getSerializedMap(kvState, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
            state.put((Object)2, (Object)"2");
            backend.setCurrentKey((Object)"11");
            state.put((Object)11, (Object)"11");
            backend.setCurrentKey((Object)"1");
            Assert.assertTrue((boolean)state.contains((Object)1));
            Assert.assertEquals((Object)"1", (Object)state.get((Object)1));
            Assert.assertEquals((Object)new HashMap<Integer, String>(){
                {
                    this.put(1, "1");
                }
            }, StateBackendTestBase.getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
            Assert.assertEquals((Object)new HashMap<Integer, String>(){
                {
                    this.put(11, "11");
                }
            }, StateBackendTestBase.getSerializedMap(kvState, "11", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
            KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.setCurrentKey((Object)"1");
            state.put((Object)1, (Object)"101");
            backend.setCurrentKey((Object)"2");
            state.put((Object)102, (Object)"102");
            backend.setCurrentKey((Object)"3");
            state.put((Object)103, (Object)"103");
            state.putAll((Map)new HashMap<Integer, String>(){
                {
                    this.put(1031, "1031");
                    this.put(1032, "1032");
                }
            });
            KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.setCurrentKey((Object)"1");
            Assert.assertEquals((Object)"101", (Object)state.get((Object)1));
            Assert.assertEquals((Object)new HashMap<Integer, String>(){
                {
                    this.put(1, "101");
                }
            }, StateBackendTestBase.getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
            backend.setCurrentKey((Object)"2");
            Assert.assertEquals((Object)"102", (Object)state.get((Object)102));
            Assert.assertEquals((Object)new HashMap<Integer, String>(){
                {
                    this.put(2, "2");
                    this.put(102, "102");
                }
            }, StateBackendTestBase.getSerializedMap(kvState, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
            backend.setCurrentKey((Object)"3");
            Assert.assertTrue((boolean)state.contains((Object)103));
            Assert.assertEquals((Object)"103", (Object)state.get((Object)103));
            Assert.assertEquals((Object)new HashMap<Integer, String>(){
                {
                    this.put(103, "103");
                    this.put(1031, "1031");
                    this.put(1032, "1032");
                }
            }, StateBackendTestBase.getSerializedMap(kvState, "3", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
            ArrayList<Integer> keys = new ArrayList<Integer>();
            for (Integer key : state.keys()) {
                keys.add(key);
            }
            List<Integer> expectedKeys = Arrays.asList(103, 1031, 1032);
            Assert.assertEquals((long)keys.size(), (long)expectedKeys.size());
            keys.removeAll(expectedKeys);
            ArrayList<String> values = new ArrayList<String>();
            for (String value : state.values()) {
                values.add(value);
            }
            List<String> expectedValues = Arrays.asList("103", "1031", "1032");
            Assert.assertEquals((long)values.size(), (long)expectedValues.size());
            values.removeAll(expectedValues);
            backend.setCurrentKey((Object)"1");
            state.clear();
            backend.setCurrentKey((Object)"2");
            state.remove((Object)102);
            backend.setCurrentKey((Object)"3");
            String updateSuffix = "_updated";
            Iterator iterator = state.iterator();
            while (iterator.hasNext()) {
                Map.Entry entry = (Map.Entry)iterator.next();
                if (((String)entry.getValue()).length() != 4) {
                    iterator.remove();
                    continue;
                }
                entry.setValue((String)entry.getValue() + "_updated");
            }
            backend.setCurrentKey((Object)"1");
            backend.setCurrentKey((Object)"2");
            Assert.assertFalse((boolean)state.contains((Object)102));
            backend.setCurrentKey((Object)"3");
            for (Map.Entry entry : state.entries()) {
                Assert.assertEquals((long)(4 + "_updated".length()), (long)((String)entry.getValue()).length());
                Assert.assertTrue((boolean)((String)entry.getValue()).endsWith("_updated"));
            }
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)StringSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            MapState restored1 = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState restoredKvState1 = (InternalKvState)restored1;
            backend.setCurrentKey((Object)"1");
            Assert.assertEquals((Object)"1", (Object)restored1.get((Object)1));
            Assert.assertEquals((Object)new HashMap<Integer, String>(){
                {
                    this.put(1, "1");
                }
            }, StateBackendTestBase.getSerializedMap(restoredKvState1, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
            backend.setCurrentKey((Object)"2");
            Assert.assertEquals((Object)"2", (Object)restored1.get((Object)2));
            Assert.assertEquals((Object)new HashMap<Integer, String>(){
                {
                    this.put(2, "2");
                }
            }, StateBackendTestBase.getSerializedMap(restoredKvState1, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)StringSerializer.INSTANCE, snapshot2);
            snapshot2.discardState();
            MapState restored2 = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState restoredKvState2 = (InternalKvState)restored2;
            backend.setCurrentKey((Object)"1");
            Assert.assertEquals((Object)"101", (Object)restored2.get((Object)1));
            Assert.assertEquals((Object)new HashMap<Integer, String>(){
                {
                    this.put(1, "101");
                }
            }, StateBackendTestBase.getSerializedMap(restoredKvState2, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
            backend.setCurrentKey((Object)"2");
            Assert.assertEquals((Object)"102", (Object)restored2.get((Object)102));
            Assert.assertEquals((Object)new HashMap<Integer, String>(){
                {
                    this.put(2, "2");
                    this.put(102, "102");
                }
            }, StateBackendTestBase.getSerializedMap(restoredKvState2, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
            backend.setCurrentKey((Object)"3");
            Assert.assertEquals((Object)"103", (Object)restored2.get((Object)103));
            Assert.assertEquals((Object)new HashMap<Integer, String>(){
                {
                    this.put(103, "103");
                    this.put(1031, "1031");
                    this.put(1032, "1032");
                }
            }, StateBackendTestBase.getSerializedMap(restoredKvState2, "3", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMapStateIsEmpty() throws Exception {
        MapStateDescriptor kvId = new MapStateDescriptor("id", Integer.class, Long.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            int i;
            MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assert.assertTrue((boolean)state.isEmpty());
            int stateSize = 1024;
            for (i = 0; i < stateSize; ++i) {
                state.put((Object)i, (Object)((long)i * 2L));
                Assert.assertFalse((boolean)state.isEmpty());
            }
            for (i = 0; i < stateSize; ++i) {
                Assert.assertFalse((boolean)state.isEmpty());
                state.remove((Object)i);
            }
            Assert.assertTrue((boolean)state.isEmpty());
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMapStateIteratorArbitraryAccess() throws Exception {
        MapStateDescriptor kvId = new MapStateDescriptor("id", Integer.class, Long.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            int stateSize = 4096;
            for (int i = 0; i < stateSize; ++i) {
                state.put((Object)i, (Object)((long)i * 2L));
            }
            Iterator iterator = state.iterator();
            int iteratorCount = 0;
            while (iterator.hasNext()) {
                Map.Entry entry = (Map.Entry)iterator.next();
                Assert.assertEquals((long)iteratorCount, (long)((Integer)entry.getKey()).intValue());
                switch (ThreadLocalRandom.current().nextInt() % 3) {
                    case 0: {
                        iterator.remove();
                        try {
                            iterator.remove();
                            Assert.fail();
                        }
                        catch (IllegalStateException illegalStateException) {}
                        break;
                    }
                    case 1: {
                        iterator.hasNext();
                        iterator.remove();
                        break;
                    }
                }
                ++iteratorCount;
            }
            Assert.assertEquals((long)stateSize, (long)iteratorCount);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testValueStateNullAsDefaultValue() throws Exception {
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, null);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assert.assertNull((Object)state.value());
            state.update((Object)"Ciao");
            Assert.assertEquals((Object)"Ciao", (Object)state.value());
            state.clear();
            Assert.assertNull((Object)state.value());
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testValueStateDefaultValue() throws Exception {
        KeyedStateHandle keyedStateHandle;
        ValueState state;
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, (Object)"Hello");
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"Hello", (Object)state.value());
            state.update((Object)"Ciao");
            Assert.assertEquals((Object)"Ciao", (Object)state.value());
            state.clear();
            Assert.assertEquals((Object)"Hello", (Object)state.value());
            keyedStateHandle = this.runSnapshot(backend.snapshot(1L, 1L, this.createStreamFactory(), CheckpointOptions.forCheckpointWithDefaultLocation()), new SharedStateRegistry());
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
        try {
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, keyedStateHandle);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)"Hello", (Object)state.value());
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReducingStateDefaultValue() throws Exception {
        ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), String.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ReducingState state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assert.assertNull((Object)state.get());
            state.add((Object)"Ciao");
            Assert.assertEquals((Object)"Ciao", (Object)state.get());
            state.clear();
            Assert.assertNull((Object)state.get());
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateDefaultValue() throws Exception {
        ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListState state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assert.assertNull((Object)state.get());
            state.update(Arrays.asList("Ciao", "Bello"));
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new String[]{"Ciao", "Bello"}));
            state.clear();
            Assert.assertNull((Object)state.get());
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMapStateDefaultValue() throws Exception {
        MapStateDescriptor kvId = new MapStateDescriptor("id", String.class, String.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assert.assertNotNull((Object)state.entries());
            Assert.assertFalse((boolean)state.entries().iterator().hasNext());
            state.put((Object)"Ciao", (Object)"Hello");
            state.put((Object)"Bello", (Object)"Nice");
            Assert.assertNotNull((Object)state.entries());
            Assert.assertEquals((Object)state.get((Object)"Ciao"), (Object)"Hello");
            Assert.assertEquals((Object)state.get((Object)"Bello"), (Object)"Nice");
            state.clear();
            Assert.assertNotNull((Object)state.entries());
            Assert.assertFalse((boolean)state.entries().iterator().hasNext());
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSnapshotNonAccessedState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        String stateName = "test-name";
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            MapStateDescriptor kvId = new MapStateDescriptor("test-name", Integer.class, String.class);
            MapState mapState = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)"1");
            mapState.put((Object)11, (Object)"foo");
            backend.setCurrentKey((Object)"2");
            mapState.put((Object)8, (Object)"bar");
            backend.setCurrentKey((Object)"3");
            mapState.put((Object)91, (Object)"hello world");
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)StringSerializer.INSTANCE, snapshot);
            snapshot = this.runSnapshot(backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)StringSerializer.INSTANCE, snapshot);
            mapState = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)"1");
            Assert.assertEquals((Object)"foo", (Object)mapState.get((Object)11));
            backend.setCurrentKey((Object)"2");
            Assert.assertEquals((Object)"bar", (Object)mapState.get((Object)8));
            backend.setCurrentKey((Object)"3");
            Assert.assertEquals((Object)"hello world", (Object)mapState.get((Object)91));
            snapshot.discardState();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    @Test
    public void testKeyGroupSnapshotRestoreScaleDown() throws Exception {
        this.testKeyGroupSnapshotRestore(4, 2, 128);
    }

    @Test
    public void testKeyGroupSnapshotRestoreScaleUp() throws Exception {
        this.testKeyGroupSnapshotRestore(2, 4, 128);
    }

    @Test
    public void testKeyGroupsSnapshotRestoreNoRescale() throws Exception {
        this.testKeyGroupSnapshotRestore(2, 2, 128);
    }

    @Test
    public void testKeyGroupsSnapshotRestoreScaleUpUnEvenDistribute() throws Exception {
        this.testKeyGroupSnapshotRestore(15, 77, 128);
    }

    @Test
    public void testKeyGroupsSnapshotRestoreScaleDownUnEvenDistribute() throws Exception {
        this.testKeyGroupSnapshotRestore(77, 15, 128);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testKeyGroupSnapshotRestore(int sourceParallelism, int targetParallelism, int maxParallelism) throws Exception {
        int i;
        Preconditions.checkArgument((sourceParallelism > 0 ? 1 : 0) != 0, (String)"parallelism must be positive, current is %s.", (Object[])new Object[]{sourceParallelism});
        Preconditions.checkArgument((targetParallelism > 0 ? 1 : 0) != 0, (String)"parallelism must be positive, current is %s.", (Object[])new Object[]{targetParallelism});
        Preconditions.checkArgument((sourceParallelism <= maxParallelism ? 1 : 0) != 0, (Object)"Maximum parallelism must not be smaller than parallelism.");
        Preconditions.checkArgument((targetParallelism <= maxParallelism ? 1 : 0) != 0, (Object)"Maximum parallelism must not be smaller than parallelism.");
        Random random = new Random();
        ArrayList<ValueStateDescriptor> stateDescriptors = new ArrayList<ValueStateDescriptor>(maxParallelism);
        ArrayList<Integer> keyInKeyGroups = new ArrayList<Integer>(maxParallelism);
        ArrayList<String> expectedValue = new ArrayList<String>(maxParallelism);
        for (int i2 = 0; i2 < maxParallelism; ++i2) {
            stateDescriptors.add(new ValueStateDescriptor("state" + i2, String.class));
        }
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        ArrayList<KeyedStateHandle> snapshots = new ArrayList<KeyedStateHandle>(sourceParallelism);
        for (int i3 = 0; i3 < sourceParallelism; ++i3) {
            KeyGroupRange range = KeyGroupRange.of((int)(maxParallelism * i3 / sourceParallelism), (int)(maxParallelism * (i3 + 1) / sourceParallelism - 1));
            CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, maxParallelism, range, this.env);
            try {
                for (int j = range.getStartKeyGroup(); j <= range.getEndKeyGroup(); ++j) {
                    ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescriptors.get(j));
                    int keyInKeyGroup = this.getKeyInKeyGroup(random, maxParallelism, KeyGroupRange.of((int)j, (int)j));
                    backend.setCurrentKey((Object)keyInKeyGroup);
                    keyInKeyGroups.add(keyInKeyGroup);
                    String updateValue = i3 + ":" + j;
                    state.update((Object)updateValue);
                    expectedValue.add(updateValue);
                }
                snapshots.add(this.runSnapshot(backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry));
                continue;
            }
            finally {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
        ArrayList<KeyGroupRange> keyGroupRangesRestore = new ArrayList<KeyGroupRange>();
        for (int i4 = 0; i4 < targetParallelism; ++i4) {
            keyGroupRangesRestore.add(KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex((int)maxParallelism, (int)targetParallelism, (int)i4));
        }
        ArrayList keyGroupStatesAfterDistribute = new ArrayList(targetParallelism);
        for (i = 0; i < targetParallelism; ++i) {
            ArrayList keyedStateHandles = new ArrayList();
            StateAssignmentOperation.extractIntersectingState(snapshots, (KeyGroupRange)((KeyGroupRange)keyGroupRangesRestore.get(i)), keyedStateHandles);
            keyGroupStatesAfterDistribute.add(keyedStateHandles);
        }
        for (i = 0; i < targetParallelism; ++i) {
            CheckpointableKeyedStateBackend backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, maxParallelism, (KeyGroupRange)keyGroupRangesRestore.get(i), (List)keyGroupStatesAfterDistribute.get(i), this.env);
            try {
                KeyGroupRange range = (KeyGroupRange)keyGroupRangesRestore.get(i);
                for (int j = range.getStartKeyGroup(); j <= range.getEndKeyGroup(); ++j) {
                    ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescriptors.get(j));
                    backend.setCurrentKey(keyInKeyGroups.get(j));
                    Assert.assertEquals(expectedValue.get(j), (Object)state.value());
                }
                continue;
            }
            finally {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRestoreWithWrongKeySerializer() throws Exception {
        Assume.assumeTrue((boolean)this.supportsMetaInfoVerification());
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        KeyedStateHandle snapshot = null;
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)"1");
            backend.setCurrentKey((Object)2);
            state.update((Object)"2");
            snapshot = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
        try {
            this.restoreKeyedBackend((TypeSerializer)DoubleSerializer.INSTANCE, snapshot);
            Assert.fail((String)"should recognize wrong key serializer");
        }
        catch (StateMigrationException state) {
        }
        catch (BackendBuildingException ignored) {
            Assert.assertTrue((boolean)(ignored.getCause() instanceof StateMigrationException));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testValueStateRestoreWithWrongSerializers() throws Exception {
        Assume.assumeTrue((boolean)this.supportsMetaInfoVerification());
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)"1");
            backend.setCurrentKey((Object)2);
            state.update((Object)"2");
            KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            try {
                kvId = new ValueStateDescriptor("id", (TypeSerializer)fakeStringSerializer);
                state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
                state.value();
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (StateMigrationException stateMigrationException) {
                // empty catch block
            }
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateRestoreWithWrongSerializers() throws Exception {
        Assume.assumeTrue((boolean)this.supportsMetaInfoVerification());
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
            ListState state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.add((Object)"1");
            backend.setCurrentKey((Object)2);
            state.add((Object)"2");
            KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            try {
                kvId = new ListStateDescriptor("id", (TypeSerializer)fakeStringSerializer);
                state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
                state.get();
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (StateMigrationException stateMigrationException) {
                // empty catch block
            }
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReducingStateRestoreWithWrongSerializers() throws Exception {
        Assume.assumeTrue((boolean)this.supportsMetaInfoVerification());
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), (TypeSerializer)StringSerializer.INSTANCE);
            ReducingState state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.add((Object)"1");
            backend.setCurrentKey((Object)2);
            state.add((Object)"2");
            KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            try {
                kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), (TypeSerializer)fakeStringSerializer);
                state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
                state.get();
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (StateMigrationException stateMigrationException) {
                // empty catch block
            }
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMapStateRestoreWithWrongSerializers() throws Exception {
        Assume.assumeTrue((boolean)this.supportsMetaInfoVerification());
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            MapStateDescriptor kvId = new MapStateDescriptor("id", (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE);
            MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.put((Object)"1", (Object)"First");
            backend.setCurrentKey((Object)2);
            state.put((Object)"2", (Object)"Second");
            KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            try {
                kvId = new MapStateDescriptor("id", (TypeSerializer)fakeStringSerializer, (TypeSerializer)StringSerializer.INSTANCE);
                state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
                state.entries();
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (StateMigrationException stateMigrationException) {
                // empty catch block
            }
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCopyDefaultValue() throws Exception {
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", IntValue.class, (Object)new IntValue(-1));
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            IntValue default1 = (IntValue)state.value();
            backend.setCurrentKey((Object)2);
            IntValue default2 = (IntValue)state.value();
            Assert.assertNotNull((Object)default1);
            Assert.assertNotNull((Object)default2);
            Assert.assertEquals((Object)default1, (Object)default2);
            Assert.assertFalse((default1 == default2 ? 1 : 0) != 0);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequireNonNullNamespace() throws Exception {
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", IntValue.class, (Object)new IntValue(-1));
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            try {
                backend.getPartitionedState(null, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
                Assert.fail((String)"Did not throw expected NullPointerException");
            }
            catch (NullPointerException nullPointerException) {
                // empty catch block
            }
            try {
                backend.getPartitionedState((Object)VoidNamespace.INSTANCE, null, (StateDescriptor)kvId);
                Assert.fail((String)"Did not throw expected NullPointerException");
            }
            catch (NullPointerException nullPointerException) {
                // empty catch block
            }
            try {
                backend.getPartitionedState(null, null, (StateDescriptor)kvId);
                Assert.fail((String)"Did not throw expected NullPointerException");
            }
            catch (NullPointerException nullPointerException) {
                // empty catch block
            }
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void testConcurrentMapIfQueryable() throws Exception {
        boolean numberOfKeyGroups = true;
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new DummyEnvironment());
        try {
            ValueStateDescriptor desc = new ValueStateDescriptor("value-state", Integer.class, (Object)-1);
            desc.setQueryable("my-query");
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
            InternalKvState kvState = (InternalKvState)state;
            Assert.assertTrue((boolean)(kvState instanceof AbstractHeapState));
            kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            backend.setCurrentKey((Object)1);
            state.update((Object)121818273);
            StateTable stateTable = ((AbstractHeapState)kvState).getStateTable();
            this.checkConcurrentStateTable(stateTable, 1);
            desc = new ListStateDescriptor("list-state", Integer.class);
            desc.setQueryable("my-query");
            state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
            kvState = (InternalKvState)state;
            Assert.assertTrue((boolean)(kvState instanceof AbstractHeapState));
            kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            backend.setCurrentKey((Object)1);
            state.add((Object)121818273);
            stateTable = ((AbstractHeapState)kvState).getStateTable();
            this.checkConcurrentStateTable(stateTable, 1);
            desc = new ReducingStateDescriptor("reducing-state", (ReduceFunction)new ReduceFunction<Integer>(){

                public Integer reduce(Integer value1, Integer value2) throws Exception {
                    return value1 + value2;
                }
            }, Integer.class);
            desc.setQueryable("my-query");
            state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
            kvState = (InternalKvState)state;
            Assert.assertTrue((boolean)(kvState instanceof AbstractHeapState));
            kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            backend.setCurrentKey((Object)1);
            state.add((Object)121818273);
            stateTable = ((AbstractHeapState)kvState).getStateTable();
            this.checkConcurrentStateTable(stateTable, 1);
            desc = new MapStateDescriptor("map-state", Integer.class, String.class);
            desc.setQueryable("my-query");
            state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
            kvState = (InternalKvState)state;
            Assert.assertTrue((boolean)(kvState instanceof AbstractHeapState));
            kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            backend.setCurrentKey((Object)1);
            state.put((Object)121818273, (Object)"121818273");
            int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup((Object)1, (int)1);
            StateTable stateTable2 = ((AbstractHeapState)kvState).getStateTable();
            Assert.assertNotNull((String)"State not set", (Object)stateTable2.get((Object)keyGroupIndex));
            this.checkConcurrentStateTable(stateTable2, 1);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    private void checkConcurrentStateTable(StateTable<?, ?, ?> stateTable, int numberOfKeyGroups) {
        Assert.assertNotNull((String)"State not set", stateTable);
        if (stateTable instanceof NestedMapsStateTable) {
            int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup((Object)1, (int)numberOfKeyGroups);
            NestedMapsStateTable nestedMapsStateTable = (NestedMapsStateTable)stateTable;
            NestedStateMap[] nestedStateMaps = (NestedStateMap[])nestedMapsStateTable.getState();
            Assert.assertTrue((boolean)(nestedStateMaps[keyGroupIndex].getNamespaceMap() instanceof ConcurrentHashMap));
            Assert.assertTrue((boolean)(nestedStateMaps[keyGroupIndex].getNamespaceMap().get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testQueryableStateRegistration() throws Exception {
        KvStateRegistry registry = this.env.getKvStateRegistry();
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            KeyGroupRange expectedKeyGroupRange = backend.getKeyGroupRange();
            KvStateRegistryListener listener = (KvStateRegistryListener)Mockito.mock(KvStateRegistryListener.class);
            registry.registerListener(HighAvailabilityServices.DEFAULT_JOB_ID, listener);
            ValueStateDescriptor desc = new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE);
            desc.setQueryable("banana");
            backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
            ((KvStateRegistryListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).notifyKvStateRegistered((JobID)Matchers.eq((Object)this.env.getJobID()), (JobVertexID)Matchers.eq((Object)this.env.getJobVertexId()), (KeyGroupRange)Matchers.eq((Object)expectedKeyGroupRange), (String)Matchers.eq((Object)"banana"), (KvStateID)Matchers.any(KvStateID.class));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            ((KvStateRegistryListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).notifyKvStateUnregistered((JobID)Matchers.eq((Object)this.env.getJobID()), (JobVertexID)Matchers.eq((Object)this.env.getJobVertexId()), (KeyGroupRange)Matchers.eq((Object)expectedKeyGroupRange), (String)Matchers.eq((Object)"banana"));
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, this.env);
            if (snapshot != null) {
                snapshot.discardState();
            }
            backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
            ((KvStateRegistryListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)2))).notifyKvStateRegistered((JobID)Matchers.eq((Object)this.env.getJobID()), (JobVertexID)Matchers.eq((Object)this.env.getJobVertexId()), (KeyGroupRange)Matchers.eq((Object)expectedKeyGroupRange), (String)Matchers.eq((Object)"banana"), (KvStateID)Matchers.any(KvStateID.class));
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEmptyStateCheckpointing() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            CheckpointStreamFactory streamFactory = this.createStreamFactory();
            SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
            ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462379L, 1L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            Assert.assertNull((Object)snapshot);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNumStateEntries() throws Exception {
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            Assert.assertEquals((long)0L, (long)((TestableKeyedStateBackend)backend).numKeyValueStateEntries());
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)0);
            state.update((Object)"hello");
            state.update((Object)"ciao");
            Assert.assertEquals((long)1L, (long)((TestableKeyedStateBackend)backend).numKeyValueStateEntries());
            backend.setCurrentKey((Object)42);
            state.update((Object)"foo");
            Assert.assertEquals((long)2L, (long)((TestableKeyedStateBackend)backend).numKeyValueStateEntries());
            backend.setCurrentKey((Object)0);
            state.clear();
            Assert.assertEquals((long)1L, (long)((TestableKeyedStateBackend)backend).numKeyValueStateEntries());
            backend.setCurrentKey((Object)42);
            state.clear();
            Assert.assertEquals((long)0L, (long)((TestableKeyedStateBackend)backend).numKeyValueStateEntries());
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testParallelAsyncSnapshots() throws Exception {
        Assume.assumeTrue((boolean)this.snapshotUsesStreamFactory());
        OneShotLatch blocker = new OneShotLatch();
        OneShotLatch waiter = new OneShotLatch();
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        streamFactory.setWaiterLatch(waiter);
        streamFactory.setBlockerLatch(blocker);
        streamFactory.setAfterNumberInvocations(10);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            if (!this.supportsAsynchronousSnapshots()) {
                return;
            }
            InternalValueState valueState = (InternalValueState)backend.createInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE));
            valueState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            for (int i = 0; i < 10; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)i);
            }
            RunnableFuture snapshot1 = backend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread runner1 = new Thread((Runnable)snapshot1, "snapshot-1-runner");
            runner1.start();
            waiter.await();
            for (int i = 5; i < 15; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)(i + 1));
            }
            streamFactory.setWaiterLatch(null);
            streamFactory.setBlockerLatch(null);
            RunnableFuture snapshot2 = backend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread runner2 = new Thread((Runnable)snapshot2, "snapshot-2-runner");
            runner2.start();
            snapshot2.get();
            blocker.trigger();
            snapshot1.get();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    @Test
    public void testNonConcurrentSnapshotTransformerAccess() throws Exception {
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        CheckpointableKeyedStateBackend backend = null;
        try {
            backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
            new StateSnapshotTransformerTest(backend, streamFactory).testNonConcurrentSnapshotTransformerAccess();
        }
        finally {
            if (backend != null) {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAsyncSnapshot() throws Exception {
        InternalValueState valueState;
        Assume.assumeTrue((boolean)this.snapshotUsesStreamFactory());
        OneShotLatch waiter = new OneShotLatch();
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        streamFactory.setWaiterLatch(waiter);
        CheckpointableKeyedStateBackend backend = null;
        KeyedStateHandle stateHandle = null;
        try {
            backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
            valueState = (InternalValueState)backend.createInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE));
            valueState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            for (int i = 0; i < 10; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)i);
            }
            RunnableFuture snapshot = backend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread runner = new Thread(snapshot);
            runner.start();
            for (int i = 0; i < 20; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)(i + 1));
                if (10 != i) continue;
                waiter.await();
            }
            runner.join();
            SnapshotResult snapshotResult = (SnapshotResult)snapshot.get();
            stateHandle = (KeyedStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
            for (int i = 0; i < 20; ++i) {
                backend.setCurrentKey((Object)i);
                Assert.assertEquals((long)(i + 1), (long)((Integer)valueState.value()).intValue());
            }
        }
        finally {
            if (null != backend) {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
        Assert.assertNotNull((Object)stateHandle);
        backend = null;
        try {
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, stateHandle);
            valueState = (InternalValueState)backend.createInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE));
            valueState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            for (int i = 0; i < 10; ++i) {
                backend.setCurrentKey((Object)i);
                Assert.assertEquals((long)i, (long)((Integer)valueState.value()).intValue());
            }
            backend.setCurrentKey((Object)11);
            Assert.assertNull((Object)valueState.value());
        }
        finally {
            if (null != backend) {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentModificationWithApplyToAllKeys() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListStateDescriptor listStateDescriptor = new ListStateDescriptor("foo", (TypeSerializer)StringSerializer.INSTANCE);
            ListState listState = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor);
            for (int i = 0; i < 100; ++i) {
                backend.setCurrentKey((Object)i);
                listState.add((Object)("Hello" + i));
            }
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (KeyedStateFunction)new KeyedStateFunction<Integer, ListState<String>>(){

                public void process(Integer key, ListState<String> state) throws Exception {
                    Assert.assertEquals((Object)("Hello" + key), ((Iterable)state.get()).iterator().next());
                }
            });
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (KeyedStateFunction)new KeyedStateFunction<Integer, ListState<String>>(){

                public void process(Integer key, ListState<String> state) throws Exception {
                    state.clear();
                }
            });
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (KeyedStateFunction)new KeyedStateFunction<Integer, ListState<String>>(){

                public void process(Integer key, ListState<String> state) throws Exception {
                    Assert.assertFalse((boolean)((Iterable)state.get()).iterator().hasNext());
                }
            });
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (KeyedStateFunction)new KeyedStateFunction<Integer, ListState<String>>(){

                public void process(Integer key, ListState<String> state) throws Exception {
                    state.add((Object)("Hello" + key));
                    state.clear();
                    state.add((Object)("Hello_" + key));
                }
            });
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (KeyedStateFunction)new KeyedStateFunction<Integer, ListState<String>>(){

                public void process(Integer key, ListState<String> state) throws Exception {
                    Iterator it = ((Iterable)state.get()).iterator();
                    Assert.assertEquals((Object)("Hello_" + key), it.next());
                    Assert.assertFalse((boolean)it.hasNext());
                }
            });
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testApplyToAllKeysLambdaFunction() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListStateDescriptor listStateDescriptor = new ListStateDescriptor("foo", (TypeSerializer)StringSerializer.INSTANCE);
            ListState listState = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor);
            for (int i = 0; i < 100; ++i) {
                backend.setCurrentKey((Object)i);
                listState.add((Object)("Hello" + i));
            }
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (key, state) -> Assert.assertEquals((Object)("Hello" + key), ((Iterable)state.get()).iterator().next()));
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAsyncSnapshotCancellation() throws Exception {
        Assume.assumeTrue((boolean)this.snapshotUsesStreamFactory());
        OneShotLatch blocker = new OneShotLatch();
        OneShotLatch waiter = new OneShotLatch();
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        streamFactory.setWaiterLatch(waiter);
        streamFactory.setBlockerLatch(blocker);
        streamFactory.setAfterNumberInvocations(10);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            if (!this.supportsAsynchronousSnapshots()) {
                return;
            }
            InternalValueState valueState = (InternalValueState)backend.createInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE));
            valueState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            for (int i = 0; i < 10; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)i);
            }
            RunnableFuture snapshot = backend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread runner = new Thread(snapshot);
            runner.start();
            waiter.await();
            IOUtils.closeQuietly(backend);
            blocker.trigger();
            runner.join();
            try {
                snapshot.get();
                Assert.fail((String)"Close was not propagated.");
            }
            catch (CancellationException cancellationException) {
                // empty catch block
            }
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMapStateGetKeys() throws Exception {
        int namespace1ElementsNum = 1000;
        int namespace2ElementsNum = 1000;
        String fieldName = "get-keys-test";
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            int expectedKey;
            PrimitiveIterator.OfInt actualIterator;
            String ns1 = "ns1";
            MapState keyedState1 = (MapState)backend.getPartitionedState((Object)"ns1", (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new MapStateDescriptor(fieldName, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE));
            for (int key = 0; key < 1000; ++key) {
                backend.setCurrentKey((Object)key);
                keyedState1.put((Object)"he", (Object)(key * 2));
                keyedState1.put((Object)"ho", (Object)(key * 2));
            }
            String ns2 = "ns2";
            MapState keyedState2 = (MapState)backend.getPartitionedState((Object)"ns2", (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new MapStateDescriptor(fieldName, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE));
            for (int key = 1000; key < 2000; ++key) {
                backend.setCurrentKey((Object)key);
                keyedState2.put((Object)"he", (Object)(key * 2));
                keyedState2.put((Object)"ho", (Object)(key * 2));
            }
            try (Stream<Integer> keysStream = backend.getKeys(fieldName, (Object)"ns1").sorted();){
                actualIterator = keysStream.mapToInt(value -> value).iterator();
                for (expectedKey = 0; expectedKey < 1000; ++expectedKey) {
                    Assert.assertTrue((boolean)actualIterator.hasNext());
                    Assert.assertEquals((long)expectedKey, (long)actualIterator.nextInt());
                }
                Assert.assertFalse((boolean)actualIterator.hasNext());
            }
            keysStream = backend.getKeys(fieldName, (Object)"ns2").sorted();
            var10_12 = null;
            try {
                actualIterator = keysStream.mapToInt(value -> value).iterator();
                for (expectedKey = 1000; expectedKey < 2000; ++expectedKey) {
                    Assert.assertTrue((boolean)actualIterator.hasNext());
                    Assert.assertEquals((long)expectedKey, (long)actualIterator.nextInt());
                }
                Assert.assertFalse((boolean)actualIterator.hasNext());
            }
            catch (Throwable throwable) {
                var10_12 = throwable;
                throw throwable;
            }
            finally {
                if (keysStream != null) {
                    if (var10_12 != null) {
                        try {
                            keysStream.close();
                        }
                        catch (Throwable throwable) {
                            var10_12.addSuppressed(throwable);
                        }
                    } else {
                        keysStream.close();
                    }
                }
            }
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            long checkpointID = 0L;
            ArrayList<Future<SnapshotResult<KeyedStateHandle>>> futureList = new ArrayList<Future<SnapshotResult<KeyedStateHandle>>>();
            for (int i = 0; i < 10; ++i) {
                ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id" + i, (TypeSerializer)IntSerializer.INSTANCE);
                ValueState state = (ValueState)backend.getOrCreateKeyedState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)valueStateDescriptor);
                ((InternalValueState)state).setCurrentNamespace((Object)VoidNamespace.INSTANCE);
                backend.setCurrentKey((Object)i);
                state.update((Object)i);
                futureList.add(this.runSnapshotAsync(executorService, backend.snapshot(checkpointID++, System.currentTimeMillis(), streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())));
            }
            for (Future future : futureList) {
                future.get(20L, TimeUnit.SECONDS);
            }
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
            executorService.shutdown();
        }
    }

    protected Future<SnapshotResult<KeyedStateHandle>> runSnapshotAsync(ExecutorService executorService, RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture) throws Exception {
        if (!snapshotRunnableFuture.isDone()) {
            CompletableFuture<SnapshotResult<KeyedStateHandle>> completableFuture = new CompletableFuture<SnapshotResult<KeyedStateHandle>>();
            executorService.submit(() -> {
                try {
                    snapshotRunnableFuture.run();
                    completableFuture.complete((SnapshotResult<KeyedStateHandle>)snapshotRunnableFuture.get());
                }
                catch (Exception e) {
                    completableFuture.completeExceptionally(e);
                }
            });
            return completableFuture;
        }
        return CompletableFuture.completedFuture(snapshotRunnableFuture.get());
    }

    private int getKeyInKeyGroup(Random random, int maxParallelism, KeyGroupRange keyGroupRange) {
        int keyInKG = random.nextInt();
        int kg = KeyGroupRangeAssignment.assignToKeyGroup((Object)keyInKG, (int)maxParallelism);
        while (!keyGroupRange.contains(kg)) {
            keyInKG = random.nextInt();
            kg = KeyGroupRangeAssignment.assignToKeyGroup((Object)keyInKG, (int)maxParallelism);
        }
        return keyInKG;
    }

    protected static <V, K, N> V getSerializedValue(InternalKvState<K, N, V> kvState, K key, TypeSerializer<K> keySerializer, N namespace, TypeSerializer<N> namespaceSerializer, TypeSerializer<V> valueSerializer) throws Exception {
        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer);
        byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace, kvState.getKeySerializer(), kvState.getNamespaceSerializer(), kvState.getValueSerializer());
        if (serializedValue == null) {
            return null;
        }
        return (V)KvStateSerializer.deserializeValue((byte[])serializedValue, valueSerializer);
    }

    private static <V, K, N> List<V> getSerializedList(InternalKvState<K, N, V> kvState, K key, TypeSerializer<K> keySerializer, N namespace, TypeSerializer<N> namespaceSerializer, TypeSerializer<V> valueSerializer) throws Exception {
        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer);
        byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace, kvState.getKeySerializer(), kvState.getNamespaceSerializer(), kvState.getValueSerializer());
        if (serializedValue == null) {
            return null;
        }
        return KvStateSerializer.deserializeList((byte[])serializedValue, valueSerializer);
    }

    private static <UK, UV, K, N> Map<UK, UV> getSerializedMap(InternalKvState<K, N, Map<UK, UV>> kvState, K key, TypeSerializer<K> keySerializer, N namespace, TypeSerializer<N> namespaceSerializer, TypeSerializer<UK> userKeySerializer, TypeSerializer<UV> userValueSerializer) throws Exception {
        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer);
        byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace, kvState.getKeySerializer(), kvState.getNamespaceSerializer(), kvState.getValueSerializer());
        if (serializedValue == null) {
            return null;
        }
        return KvStateSerializer.deserializeMap((byte[])serializedValue, userKeySerializer, userValueSerializer);
    }

    protected KeyedStateHandle runSnapshot(RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture, SharedStateRegistry sharedStateRegistry) throws Exception {
        SnapshotResult snapshotResult;
        KeyedStateHandle jobManagerOwnedSnapshot;
        if (!snapshotRunnableFuture.isDone()) {
            snapshotRunnableFuture.run();
        }
        if ((jobManagerOwnedSnapshot = (KeyedStateHandle)(snapshotResult = (SnapshotResult)snapshotRunnableFuture.get()).getJobManagerOwnedSnapshot()) != null) {
            jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry);
        }
        return jobManagerOwnedSnapshot;
    }

    private MockEnvironment buildMockEnv() throws IOException {
        return MockEnvironment.builder().setTaskStateManager(this.getTestTaskStateManager()).build();
    }

    protected TestTaskStateManager getTestTaskStateManager() throws IOException {
        return TestTaskStateManager.builder().build();
    }

    protected boolean snapshotUsesStreamFactory() {
        return true;
    }

    protected boolean supportsMetaInfoVerification() {
        return true;
    }

    public static final class MutableLong {
        long value;
    }

    private static class ImmutableAggregatingAddingFunction
    implements AggregateFunction<Long, Long, Long> {
        private ImmutableAggregatingAddingFunction() {
        }

        public Long createAccumulator() {
            return 0L;
        }

        public Long add(Long value, Long accumulator) {
            accumulator = accumulator + value;
            return accumulator;
        }

        public Long getResult(Long accumulator) {
            return accumulator;
        }

        public Long merge(Long a, Long b) {
            return a + b;
        }
    }

    public static class MutableAggregatingAddingFunction
    implements AggregateFunction<Long, MutableLong, Long> {
        public MutableLong createAccumulator() {
            return new MutableLong();
        }

        public MutableLong add(Long value, MutableLong accumulator) {
            accumulator.value += value.longValue();
            return accumulator;
        }

        public Long getResult(MutableLong accumulator) {
            return accumulator.value;
        }

        public MutableLong merge(MutableLong a, MutableLong b) {
            a.value += b.value;
            return a;
        }
    }

    public static class CustomKryoTestSerializer
    extends JavaSerializer {
        public void write(Kryo kryo, Output output, Object object) {
            super.write(kryo, output, object);
        }

        public Object read(Kryo kryo, Input input, Class type) {
            throw new ExpectedKryoTestException();
        }
    }

    public static class ExceptionThrowingTestSerializer
    extends JavaSerializer {
        public void write(Kryo kryo, Output output, Object object) {
            throw new ExpectedKryoTestException();
        }

        public Object read(Kryo kryo, Input input, Class type) {
            throw new ExpectedKryoTestException();
        }
    }

    private static class ExpectedKryoTestException
    extends RuntimeException {
        private ExpectedKryoTestException() {
        }
    }

    public static class TestNestedPojoClassB
    implements Serializable {
        private Double doubleField;
        private String strField;

        public TestNestedPojoClassB() {
        }

        public TestNestedPojoClassB(Double doubleField, String strField) {
            this.doubleField = doubleField;
            this.strField = strField;
        }

        public Double getDoubleField() {
            return this.doubleField;
        }

        public void setDoubleField(Double doubleField) {
            this.doubleField = doubleField;
        }

        public String getStrField() {
            return this.strField;
        }

        public void setStrField(String strField) {
            this.strField = strField;
        }

        public String toString() {
            return "TestNestedPojoClassB{doubleField='" + this.doubleField + '\'' + ", strField=" + this.strField + '}';
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestNestedPojoClassB testNestedPojoClassB = (TestNestedPojoClassB)o;
            if (!this.doubleField.equals(testNestedPojoClassB.doubleField)) {
                return false;
            }
            return this.strField.equals(testNestedPojoClassB.strField);
        }

        public int hashCode() {
            int result = this.doubleField.hashCode();
            result = 31 * result + this.strField.hashCode();
            return result;
        }
    }

    public static class TestNestedPojoClassA
    implements Serializable {
        private Double doubleField;
        private Integer intField;

        public TestNestedPojoClassA() {
        }

        public TestNestedPojoClassA(Double doubleField, Integer intField) {
            this.doubleField = doubleField;
            this.intField = intField;
        }

        public Double getDoubleField() {
            return this.doubleField;
        }

        public void setDoubleField(Double doubleField) {
            this.doubleField = doubleField;
        }

        public Integer getIntField() {
            return this.intField;
        }

        public void setIntField(Integer intField) {
            this.intField = intField;
        }

        public String toString() {
            return "TestNestedPojoClassA{doubleField='" + this.doubleField + '\'' + ", intField=" + this.intField + '}';
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestNestedPojoClassA testNestedPojoClassA = (TestNestedPojoClassA)o;
            if (!this.doubleField.equals(testNestedPojoClassA.doubleField)) {
                return false;
            }
            return this.intField.equals(testNestedPojoClassA.intField);
        }

        public int hashCode() {
            int result = this.doubleField.hashCode();
            result = 31 * result + this.intField.hashCode();
            return result;
        }
    }

    public static class TestPojo
    implements Serializable {
        private String strField;
        private Integer intField;
        private TestNestedPojoClassA kryoClassAField;
        private TestNestedPojoClassB kryoClassBField;

        public TestPojo() {
        }

        public TestPojo(String strField, Integer intField) {
            this.strField = strField;
            this.intField = intField;
            this.kryoClassAField = null;
            this.kryoClassBField = null;
        }

        public TestPojo(String strField, Integer intField, TestNestedPojoClassA classAField, TestNestedPojoClassB classBfield) {
            this.strField = strField;
            this.intField = intField;
            this.kryoClassAField = classAField;
            this.kryoClassBField = classBfield;
        }

        public String getStrField() {
            return this.strField;
        }

        public void setStrField(String strField) {
            this.strField = strField;
        }

        public Integer getIntField() {
            return this.intField;
        }

        public void setIntField(Integer intField) {
            this.intField = intField;
        }

        public TestNestedPojoClassA getKryoClassAField() {
            return this.kryoClassAField;
        }

        public void setKryoClassAField(TestNestedPojoClassA kryoClassAField) {
            this.kryoClassAField = kryoClassAField;
        }

        public TestNestedPojoClassB getKryoClassBField() {
            return this.kryoClassBField;
        }

        public void setKryoClassBField(TestNestedPojoClassB kryoClassBField) {
            this.kryoClassBField = kryoClassBField;
        }

        public String toString() {
            return "TestPojo{strField='" + this.strField + '\'' + ", intField=" + this.intField + '}';
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestPojo testPojo = (TestPojo)o;
            return this.strField.equals(testPojo.strField) && this.intField.equals(testPojo.intField) && (this.kryoClassAField == null && testPojo.kryoClassAField == null || this.kryoClassAField.equals(testPojo.kryoClassAField)) && (this.kryoClassBField == null && testPojo.kryoClassBField == null || this.kryoClassBField.equals(testPojo.kryoClassBField));
        }

        public int hashCode() {
            int result = this.strField.hashCode();
            result = 31 * result + this.intField.hashCode();
            if (this.kryoClassAField != null) {
                result = 31 * result + this.kryoClassAField.hashCode();
            }
            if (this.kryoClassBField != null) {
                result = 31 * result + this.kryoClassBField.hashCode();
            }
            return result;
        }
    }

    private static class AppendingReduce
    implements ReduceFunction<String> {
        private AppendingReduce() {
        }

        public String reduce(String value1, String value2) throws Exception {
            return value1 + "," + value2;
        }
    }
}

