package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.testutils.statemigration.TestType;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/state/StateBackendMigrationTestBase.class */
public abstract class StateBackendMigrationTestBase<B extends AbstractStateBackend> extends TestLogger {

    @Rule
    public final TemporaryFolder tempFolder = new TemporaryFolder();
    private CheckpointStorageLocation checkpointStorageLocation;

    /* loaded from: input_file:org/apache/flink/runtime/state/StateBackendMigrationTestBase$CustomVoidNamespaceSerializer.class */
    public static class CustomVoidNamespaceSerializer extends TypeSerializer<VoidNamespace> {
        private static final long serialVersionUID = 1;
        public static final CustomVoidNamespaceSerializer INSTANCE = new CustomVoidNamespaceSerializer();

        public boolean isImmutableType() {
            return true;
        }

        /* renamed from: createInstance, reason: merged with bridge method [inline-methods] */
        public VoidNamespace m361createInstance() {
            return VoidNamespace.get();
        }

        public VoidNamespace copy(VoidNamespace voidNamespace) {
            return VoidNamespace.get();
        }

        public VoidNamespace copy(VoidNamespace voidNamespace, VoidNamespace voidNamespace2) {
            return VoidNamespace.get();
        }

        public int getLength() {
            return 0;
        }

        public void serialize(VoidNamespace voidNamespace, DataOutputView dataOutputView) throws IOException {
            dataOutputView.write(0);
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public VoidNamespace m360deserialize(DataInputView dataInputView) throws IOException {
            dataInputView.readByte();
            return VoidNamespace.get();
        }

        public VoidNamespace deserialize(VoidNamespace voidNamespace, DataInputView dataInputView) throws IOException {
            dataInputView.readByte();
            return VoidNamespace.get();
        }

        public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
            dataOutputView.write(dataInputView.readByte());
        }

        public TypeSerializer<VoidNamespace> duplicate() {
            return this;
        }

        public boolean equals(Object obj) {
            return obj instanceof CustomVoidNamespaceSerializer;
        }

        public int hashCode() {
            return getClass().hashCode();
        }

        public TypeSerializerSnapshot<VoidNamespace> snapshotConfiguration() {
            return new CustomVoidNamespaceSerializerSnapshot();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateBackendMigrationTestBase$CustomVoidNamespaceSerializerSnapshot.class */
    public static class CustomVoidNamespaceSerializerSnapshot implements TypeSerializerSnapshot<VoidNamespace> {
        public TypeSerializer<VoidNamespace> restoreSerializer() {
            return new CustomVoidNamespaceSerializer();
        }

        public TypeSerializerSchemaCompatibility<VoidNamespace> resolveSchemaCompatibility(TypeSerializer<VoidNamespace> typeSerializer) {
            return TypeSerializerSchemaCompatibility.compatibleAsIs();
        }

        public void writeSnapshot(DataOutputView dataOutputView) throws IOException {
        }

        public void readSnapshot(int i, DataInputView dataInputView, ClassLoader classLoader) throws IOException {
        }

        public boolean equals(Object obj) {
            return obj instanceof CustomVoidNamespaceSerializerSnapshot;
        }

        public int hashCode() {
            return 0;
        }

        public int getCurrentVersion() {
            return 0;
        }
    }

    protected abstract B getStateBackend() throws Exception;

    @Test
    public void testKeyedValueStateMigration() throws Exception {
        testKeyedValueStateUpgrade(new ValueStateDescriptor<>("test-name", new TestType.V1TestTypeSerializer()), new ValueStateDescriptor<>("test-name", new TestType.V2TestTypeSerializer()));
    }

    @Test
    public void testKeyedValueStateSerializerReconfiguration() throws Exception {
        testKeyedValueStateUpgrade(new ValueStateDescriptor<>("test-name", new TestType.V1TestTypeSerializer()), new ValueStateDescriptor<>("test-name", new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @Test
    public void testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatible() {
        try {
            testKeyedValueStateUpgrade(new ValueStateDescriptor<>("test-name", new TestType.V1TestTypeSerializer()), new ValueStateDescriptor<>("test-name", new TestType.IncompatibleTestTypeSerializer()));
            Assert.fail("should have failed");
        } catch (Exception e) {
            Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
        }
    }

    private void testKeyedValueStateUpgrade(ValueStateDescriptor<TestType> valueStateDescriptor, ValueStateDescriptor<TestType> valueStateDescriptor2) throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
            createKeyedBackend.setCurrentKey(1);
            partitionedState.update(new TestType("foo", 1456));
            createKeyedBackend.setCurrentKey(2);
            partitionedState.update(new TestType("bar", 478));
            createKeyedBackend.setCurrentKey(3);
            partitionedState.update(new TestType("hello", 189));
            KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(1L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            createKeyedBackend.dispose();
            createKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
            ValueState partitionedState2 = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, valueStateDescriptor2);
            createKeyedBackend.setCurrentKey(1);
            Assert.assertEquals(new TestType("foo", 1456), partitionedState2.value());
            partitionedState2.update(new TestType("newValue1", 751));
            createKeyedBackend.setCurrentKey(2);
            Assert.assertEquals(new TestType("bar", 478), partitionedState2.value());
            partitionedState2.update(new TestType("newValue2", 167));
            createKeyedBackend.setCurrentKey(3);
            Assert.assertEquals(new TestType("hello", 189), partitionedState2.value());
            partitionedState2.update(new TestType("newValue3", 444));
            runSnapshot(createKeyedBackend.snapshot(2L, 3L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry).discardState();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testKeyedListStateMigration() throws Exception {
        testKeyedListStateUpgrade(new ListStateDescriptor<>("test-name", new TestType.V1TestTypeSerializer()), new ListStateDescriptor<>("test-name", new TestType.V2TestTypeSerializer()));
    }

    @Test
    public void testKeyedListStateSerializerReconfiguration() throws Exception {
        testKeyedListStateUpgrade(new ListStateDescriptor<>("test-name", new TestType.V1TestTypeSerializer()), new ListStateDescriptor<>("test-name", new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @Test
    public void testKeyedListStateRegistrationFailsIfNewStateSerializerIsIncompatible() {
        try {
            testKeyedListStateUpgrade(new ListStateDescriptor<>("test-name", new TestType.V1TestTypeSerializer()), new ListStateDescriptor<>("test-name", new TestType.IncompatibleTestTypeSerializer()));
            Assert.fail("should have failed");
        } catch (Exception e) {
            Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
        }
    }

    private void testKeyedListStateUpgrade(ListStateDescriptor<TestType> listStateDescriptor, ListStateDescriptor<TestType> listStateDescriptor2) throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            ListState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, listStateDescriptor);
            createKeyedBackend.setCurrentKey(1);
            partitionedState.add(new TestType("key-1", 1));
            partitionedState.add(new TestType("key-1", 2));
            partitionedState.add(new TestType("key-1", 3));
            createKeyedBackend.setCurrentKey(2);
            partitionedState.add(new TestType("key-2", 1));
            createKeyedBackend.setCurrentKey(3);
            partitionedState.add(new TestType("key-3", 1));
            partitionedState.add(new TestType("key-3", 2));
            KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(1L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            createKeyedBackend.dispose();
            createKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
            ListState partitionedState2 = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, listStateDescriptor2);
            createKeyedBackend.setCurrentKey(1);
            Iterator it = ((Iterable) partitionedState2.get()).iterator();
            Assert.assertEquals(new TestType("key-1", 1), it.next());
            Assert.assertEquals(new TestType("key-1", 2), it.next());
            Assert.assertEquals(new TestType("key-1", 3), it.next());
            Assert.assertFalse(it.hasNext());
            partitionedState2.add(new TestType("new-key-1", 123));
            createKeyedBackend.setCurrentKey(2);
            Iterator it2 = ((Iterable) partitionedState2.get()).iterator();
            Assert.assertEquals(new TestType("key-2", 1), it2.next());
            Assert.assertFalse(it2.hasNext());
            partitionedState2.add(new TestType("new-key-2", 456));
            createKeyedBackend.setCurrentKey(3);
            Iterator it3 = ((Iterable) partitionedState2.get()).iterator();
            Assert.assertEquals(new TestType("key-3", 1), it3.next());
            Assert.assertEquals(new TestType("key-3", 2), it3.next());
            Assert.assertFalse(it3.hasNext());
            partitionedState2.add(new TestType("new-key-3", 777));
            runSnapshot(createKeyedBackend.snapshot(2L, 3L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry).discardState();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testKeyedMapStateAsIs() throws Exception {
        testKeyedMapStateUpgrade(new MapStateDescriptor<>("test-name", IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()), new MapStateDescriptor<>("test-name", IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()));
    }

    @Test
    public void testKeyedMapStateStateMigration() throws Exception {
        testKeyedMapStateUpgrade(new MapStateDescriptor<>("test-name", IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()), new MapStateDescriptor<>("test-name", IntSerializer.INSTANCE, new TestType.V2TestTypeSerializer()));
    }

    @Test
    public void testKeyedMapStateSerializerReconfiguration() throws Exception {
        testKeyedMapStateUpgrade(new MapStateDescriptor<>("test-name", IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()), new MapStateDescriptor<>("test-name", IntSerializer.INSTANCE, new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @Test
    public void testKeyedMapStateRegistrationFailsIfNewStateSerializerIsIncompatible() {
        try {
            testKeyedMapStateUpgrade(new MapStateDescriptor<>("test-name", IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()), new MapStateDescriptor<>("test-name", IntSerializer.INSTANCE, new TestType.IncompatibleTestTypeSerializer()));
            Assert.fail("should have failed");
        } catch (Exception e) {
            Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
        }
    }

    private void testKeyedMapStateUpgrade(MapStateDescriptor<Integer, TestType> mapStateDescriptor, MapStateDescriptor<Integer, TestType> mapStateDescriptor2) throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            MapState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, mapStateDescriptor);
            createKeyedBackend.setCurrentKey(1);
            partitionedState.put(1, new TestType("key-1", 1));
            partitionedState.put(2, new TestType("key-1", 2));
            partitionedState.put(3, new TestType("key-1", 3));
            createKeyedBackend.setCurrentKey(2);
            partitionedState.put(1, new TestType("key-2", 1));
            createKeyedBackend.setCurrentKey(3);
            partitionedState.put(1, new TestType("key-3", 1));
            partitionedState.put(2, new TestType("key-3", 2));
            KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(1L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            createKeyedBackend.dispose();
            createKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
            MapState partitionedState2 = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, mapStateDescriptor2);
            createKeyedBackend.setCurrentKey(1);
            Iterator it = partitionedState2.iterator();
            Map.Entry entry = (Map.Entry) it.next();
            Assert.assertEquals(1, entry.getKey());
            Assert.assertEquals(new TestType("key-1", 1), entry.getValue());
            Map.Entry entry2 = (Map.Entry) it.next();
            Assert.assertEquals(2, entry2.getKey());
            Assert.assertEquals(new TestType("key-1", 2), entry2.getValue());
            Map.Entry entry3 = (Map.Entry) it.next();
            Assert.assertEquals(3, entry3.getKey());
            Assert.assertEquals(new TestType("key-1", 3), entry3.getValue());
            Assert.assertFalse(it.hasNext());
            partitionedState2.put(123, new TestType("new-key-1", 123));
            createKeyedBackend.setCurrentKey(2);
            Iterator it2 = partitionedState2.iterator();
            Map.Entry entry4 = (Map.Entry) it2.next();
            Assert.assertEquals(1, entry4.getKey());
            Assert.assertEquals(new TestType("key-2", 1), entry4.getValue());
            Assert.assertFalse(it2.hasNext());
            partitionedState2.put(456, new TestType("new-key-2", 456));
            createKeyedBackend.setCurrentKey(3);
            Iterator it3 = partitionedState2.iterator();
            Map.Entry entry5 = (Map.Entry) it3.next();
            Assert.assertEquals(1, entry5.getKey());
            Assert.assertEquals(new TestType("key-3", 1), entry5.getValue());
            Map.Entry entry6 = (Map.Entry) it3.next();
            Assert.assertEquals(2, entry6.getKey());
            Assert.assertEquals(new TestType("key-3", 2), entry6.getValue());
            Assert.assertFalse(it3.hasNext());
            partitionedState2.put(777, new TestType("new-key-3", 777));
            runSnapshot(createKeyedBackend.snapshot(2L, 3L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry).discardState();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testPriorityQueueStateCreationFailsIfNewSerializerIsNotCompatible() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            try {
                KeyGroupedInternalPriorityQueue create = createKeyedBackend.create("testPriorityQueue", new TestType.V1TestTypeSerializer());
                create.add(new TestType("key-1", 123));
                create.add(new TestType("key-2", 346));
                create.add(new TestType("key-1", 777));
                KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(1L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
                createKeyedBackend.dispose();
                createKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
                createKeyedBackend.create("testPriorityQueue", new TestType.IncompatibleTestTypeSerializer());
                Assert.fail("should have failed");
                createKeyedBackend.dispose();
            } catch (Exception e) {
                Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
                createKeyedBackend.dispose();
            }
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testStateBackendRestoreFailsIfNewKeySerializerRequiresMigration() throws Exception {
        try {
            testKeySerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.V2TestTypeSerializer());
            Assert.fail("should have failed");
        } catch (Exception e) {
            Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
        }
    }

    @Test
    public void testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration() throws Exception {
        testKeySerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.ReconfigurationRequiringTestTypeSerializer());
    }

    @Test
    public void testStateBackendRestoreFailsIfNewKeySerializerIsIncompatible() throws Exception {
        try {
            testKeySerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.IncompatibleTestTypeSerializer());
            Assert.fail("should have failed");
        } catch (Exception e) {
            Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void testKeySerializerUpgrade(TypeSerializer<TestType> typeSerializer, TypeSerializer<TestType> typeSerializer2) throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend createKeyedBackend = createKeyedBackend(typeSerializer);
        try {
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("test-name", Integer.class);
            ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
            createKeyedBackend.setCurrentKey(new TestType("foo", 123));
            partitionedState.update(1);
            createKeyedBackend.setCurrentKey(new TestType("bar", 456));
            partitionedState.update(5);
            KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(1L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            createKeyedBackend.dispose();
            createKeyedBackend = restoreKeyedBackend(typeSerializer2, runSnapshot);
            ValueState partitionedState2 = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
            createKeyedBackend.setCurrentKey(new TestType("foo", 123));
            Assert.assertEquals(1L, ((Integer) partitionedState2.value()).intValue());
            createKeyedBackend.setCurrentKey(new TestType("bar", 456));
            Assert.assertEquals(5L, ((Integer) partitionedState2.value()).intValue());
            runSnapshot(createKeyedBackend.snapshot(2L, 3L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry).discardState();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testKeyedStateRegistrationFailsIfNewNamespaceSerializerRequiresMigration() throws Exception {
        try {
            testNamespaceSerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.V2TestTypeSerializer());
            Assert.fail("should have failed");
        } catch (Exception e) {
            Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
        }
    }

    @Test
    public void testKeyedStateRegistrationSucceedsIfNewNamespaceSerializerRequiresReconfiguration() throws Exception {
        testNamespaceSerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.ReconfigurationRequiringTestTypeSerializer());
    }

    @Test
    public void testKeyedStateRegistrationFailsIfNewNamespaceSerializerIsIncompatible() throws Exception {
        try {
            testNamespaceSerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.IncompatibleTestTypeSerializer());
            Assert.fail("should have failed");
        } catch (Exception e) {
            Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
        }
    }

    private void testNamespaceSerializerUpgrade(TypeSerializer<TestType> typeSerializer, TypeSerializer<TestType> typeSerializer2) throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("test-name", Integer.class);
            ValueState partitionedState = createKeyedBackend.getPartitionedState(new TestType("namespace", 123), typeSerializer, valueStateDescriptor);
            createKeyedBackend.setCurrentKey(1);
            partitionedState.update(10);
            createKeyedBackend.setCurrentKey(5);
            partitionedState.update(50);
            KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(1L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            createKeyedBackend.dispose();
            createKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
            ValueState partitionedState2 = createKeyedBackend.getPartitionedState(new TestType("namespace", 123), typeSerializer2, valueStateDescriptor);
            createKeyedBackend.setCurrentKey(1);
            Assert.assertEquals(10L, ((Integer) partitionedState2.value()).intValue());
            partitionedState2.update(10);
            createKeyedBackend.setCurrentKey(5);
            Assert.assertEquals(50L, ((Integer) partitionedState2.value()).intValue());
            runSnapshot(createKeyedBackend.snapshot(2L, 3L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry).discardState();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testOperatorParitionableListStateMigration() throws Exception {
        testOperatorPartitionableListStateUpgrade(new ListStateDescriptor<>("partitionable-list-state", new TestType.V1TestTypeSerializer()), new ListStateDescriptor<>("partitionable-list-state", new TestType.V2TestTypeSerializer()));
    }

    @Test
    public void testOperatorParitionableListStateSerializerReconfiguration() throws Exception {
        testOperatorPartitionableListStateUpgrade(new ListStateDescriptor<>("partitionable-list-state", new TestType.V1TestTypeSerializer()), new ListStateDescriptor<>("partitionable-list-state", new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @Test
    public void testOperatorParitionableListStateRegistrationFailsIfNewSerializerIsIncompatible() throws Exception {
        try {
            testOperatorPartitionableListStateUpgrade(new ListStateDescriptor<>("partitionable-list-state", new TestType.V1TestTypeSerializer()), new ListStateDescriptor<>("partitionable-list-state", new TestType.IncompatibleTestTypeSerializer()));
            Assert.fail("should have failed.");
        } catch (Exception e) {
            Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
        }
    }

    private void testOperatorPartitionableListStateUpgrade(ListStateDescriptor<TestType> listStateDescriptor, ListStateDescriptor<TestType> listStateDescriptor2) throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        OperatorStateBackend createOperatorStateBackend = createOperatorStateBackend();
        try {
            ListState listState = createOperatorStateBackend.getListState(listStateDescriptor);
            listState.add(new TestType("foo", 13));
            listState.add(new TestType("bar", 278));
            OperatorStateHandle runSnapshot = runSnapshot(createOperatorStateBackend.snapshot(1L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            createOperatorStateBackend.dispose();
            createOperatorStateBackend = restoreOperatorStateBackend(runSnapshot);
            ListState listState2 = createOperatorStateBackend.getListState(listStateDescriptor2);
            Iterator it = ((Iterable) listState2.get()).iterator();
            Assert.assertEquals(new TestType("foo", 13), it.next());
            Assert.assertEquals(new TestType("bar", 278), it.next());
            Assert.assertFalse(it.hasNext());
            listState2.add(new TestType("new-entry", 777));
            createOperatorStateBackend.dispose();
        } catch (Throwable th) {
            createOperatorStateBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testOperatorUnionListStateMigration() throws Exception {
        testOperatorUnionListStateUpgrade(new ListStateDescriptor<>("union-list-state", new TestType.V1TestTypeSerializer()), new ListStateDescriptor<>("union-list-state", new TestType.V2TestTypeSerializer()));
    }

    @Test
    public void testOperatorUnionListStateSerializerReconfiguration() throws Exception {
        testOperatorUnionListStateUpgrade(new ListStateDescriptor<>("union-list-state", new TestType.V1TestTypeSerializer()), new ListStateDescriptor<>("union-list-state", new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @Test
    public void testOperatorUnionListStateRegistrationFailsIfNewSerializerIsIncompatible() {
        try {
            testOperatorUnionListStateUpgrade(new ListStateDescriptor<>("union-list-state", new TestType.V1TestTypeSerializer()), new ListStateDescriptor<>("union-list-state", new TestType.IncompatibleTestTypeSerializer()));
            Assert.fail("should have failed.");
        } catch (Exception e) {
            Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
        }
    }

    private void testOperatorUnionListStateUpgrade(ListStateDescriptor<TestType> listStateDescriptor, ListStateDescriptor<TestType> listStateDescriptor2) throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        OperatorStateBackend createOperatorStateBackend = createOperatorStateBackend();
        try {
            ListState unionListState = createOperatorStateBackend.getUnionListState(listStateDescriptor);
            unionListState.add(new TestType("foo", 13));
            unionListState.add(new TestType("bar", 278));
            OperatorStateHandle runSnapshot = runSnapshot(createOperatorStateBackend.snapshot(1L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            createOperatorStateBackend.dispose();
            createOperatorStateBackend = restoreOperatorStateBackend(runSnapshot);
            ListState unionListState2 = createOperatorStateBackend.getUnionListState(listStateDescriptor2);
            Iterator it = ((Iterable) unionListState2.get()).iterator();
            Assert.assertEquals(new TestType("foo", 13), it.next());
            Assert.assertEquals(new TestType("bar", 278), it.next());
            Assert.assertFalse(it.hasNext());
            unionListState2.add(new TestType("new-entry", 777));
            createOperatorStateBackend.dispose();
        } catch (Throwable th) {
            createOperatorStateBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testBroadcastStateValueMigration() throws Exception {
        testBroadcastStateValueUpgrade(new MapStateDescriptor<>("broadcast-state", IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()), new MapStateDescriptor<>("broadcast-state", IntSerializer.INSTANCE, new TestType.V2TestTypeSerializer()));
    }

    @Test
    public void testBroadcastStateKeyMigration() throws Exception {
        testBroadcastStateKeyUpgrade(new MapStateDescriptor<>("broadcast-state", new TestType.V1TestTypeSerializer(), IntSerializer.INSTANCE), new MapStateDescriptor<>("broadcast-state", new TestType.V2TestTypeSerializer(), IntSerializer.INSTANCE));
    }

    @Test
    public void testBroadcastStateValueSerializerReconfiguration() throws Exception {
        testBroadcastStateValueUpgrade(new MapStateDescriptor<>("broadcast-state", IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()), new MapStateDescriptor<>("broadcast-state", IntSerializer.INSTANCE, new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @Test
    public void testBroadcastStateKeySerializerReconfiguration() throws Exception {
        testBroadcastStateKeyUpgrade(new MapStateDescriptor<>("broadcast-state", new TestType.V1TestTypeSerializer(), IntSerializer.INSTANCE), new MapStateDescriptor<>("broadcast-state", new TestType.ReconfigurationRequiringTestTypeSerializer(), IntSerializer.INSTANCE));
    }

    @Test
    public void testBroadcastStateRegistrationFailsIfNewValueSerializerIsIncompatible() {
        try {
            testBroadcastStateValueUpgrade(new MapStateDescriptor<>("broadcast-state", IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()), new MapStateDescriptor<>("broadcast-state", IntSerializer.INSTANCE, new TestType.IncompatibleTestTypeSerializer()));
            Assert.fail("should have failed.");
        } catch (Exception e) {
            Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
        }
    }

    @Test
    public void testBroadcastStateRegistrationFailsIfNewKeySerializerIsIncompatible() {
        try {
            testBroadcastStateKeyUpgrade(new MapStateDescriptor<>("broadcast-state", new TestType.V1TestTypeSerializer(), IntSerializer.INSTANCE), new MapStateDescriptor<>("broadcast-state", new TestType.IncompatibleTestTypeSerializer(), IntSerializer.INSTANCE));
            Assert.fail("should have failed.");
        } catch (Exception e) {
            Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
        }
    }

    private void testBroadcastStateValueUpgrade(MapStateDescriptor<Integer, TestType> mapStateDescriptor, MapStateDescriptor<Integer, TestType> mapStateDescriptor2) throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        OperatorStateBackend createOperatorStateBackend = createOperatorStateBackend();
        try {
            BroadcastState broadcastState = createOperatorStateBackend.getBroadcastState(mapStateDescriptor);
            broadcastState.put(3, new TestType("foo", 13));
            broadcastState.put(5, new TestType("bar", 278));
            OperatorStateHandle runSnapshot = runSnapshot(createOperatorStateBackend.snapshot(1L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            createOperatorStateBackend.dispose();
            createOperatorStateBackend = restoreOperatorStateBackend(runSnapshot);
            BroadcastState broadcastState2 = createOperatorStateBackend.getBroadcastState(mapStateDescriptor2);
            Assert.assertEquals(new TestType("foo", 13), broadcastState2.get(3));
            Assert.assertEquals(new TestType("bar", 278), broadcastState2.get(5));
            broadcastState2.put(17, new TestType("new-entry", 777));
            createOperatorStateBackend.dispose();
        } catch (Throwable th) {
            createOperatorStateBackend.dispose();
            throw th;
        }
    }

    private void testBroadcastStateKeyUpgrade(MapStateDescriptor<TestType, Integer> mapStateDescriptor, MapStateDescriptor<TestType, Integer> mapStateDescriptor2) throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        OperatorStateBackend createOperatorStateBackend = createOperatorStateBackend();
        try {
            BroadcastState broadcastState = createOperatorStateBackend.getBroadcastState(mapStateDescriptor);
            broadcastState.put(new TestType("foo", 13), 3);
            broadcastState.put(new TestType("bar", 278), 5);
            OperatorStateHandle runSnapshot = runSnapshot(createOperatorStateBackend.snapshot(1L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            createOperatorStateBackend.dispose();
            createOperatorStateBackend = restoreOperatorStateBackend(runSnapshot);
            BroadcastState broadcastState2 = createOperatorStateBackend.getBroadcastState(mapStateDescriptor2);
            Assert.assertEquals(3, broadcastState2.get(new TestType("foo", 13)));
            Assert.assertEquals(5, broadcastState2.get(new TestType("bar", 278)));
            broadcastState2.put(new TestType("new-entry", 777), 17);
            createOperatorStateBackend.dispose();
        } catch (Throwable th) {
            createOperatorStateBackend.dispose();
            throw th;
        }
    }

    private CheckpointStreamFactory createStreamFactory() throws Exception {
        if (this.checkpointStorageLocation == null) {
            this.checkpointStorageLocation = getStateBackend().createCheckpointStorage(new JobID()).initializeLocationForCheckpoint(1L);
        }
        return this.checkpointStorageLocation;
    }

    private <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> typeSerializer) throws Exception {
        return createKeyedBackend(typeSerializer, new DummyEnvironment());
    }

    private <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> typeSerializer, Environment environment) throws Exception {
        return createKeyedBackend(typeSerializer, 10, new KeyGroupRange(0, 9), environment);
    }

    private <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, Environment environment) throws Exception {
        return getStateBackend().createKeyedStateBackend(environment, new JobID(), "test_op", typeSerializer, i, keyGroupRange, environment.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), Collections.emptyList(), new CloseableRegistry());
    }

    private <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> typeSerializer, KeyedStateHandle keyedStateHandle) throws Exception {
        return restoreKeyedBackend(typeSerializer, keyedStateHandle, new DummyEnvironment());
    }

    private <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> typeSerializer, KeyedStateHandle keyedStateHandle, Environment environment) throws Exception {
        return restoreKeyedBackend(typeSerializer, 10, new KeyGroupRange(0, 9), Collections.singletonList(keyedStateHandle), environment);
    }

    private <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, List<KeyedStateHandle> list, Environment environment) throws Exception {
        return getStateBackend().createKeyedStateBackend(environment, new JobID(), "test_op", typeSerializer, i, keyGroupRange, environment.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), list, new CloseableRegistry());
    }

    private KeyedStateHandle runSnapshot(RunnableFuture<SnapshotResult<KeyedStateHandle>> runnableFuture, SharedStateRegistry sharedStateRegistry) throws Exception {
        if (!runnableFuture.isDone()) {
            runnableFuture.run();
        }
        KeyedStateHandle jobManagerOwnedSnapshot = runnableFuture.get().getJobManagerOwnedSnapshot();
        if (jobManagerOwnedSnapshot != null) {
            jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry);
        }
        return jobManagerOwnedSnapshot;
    }

    private OperatorStateBackend createOperatorStateBackend() throws Exception {
        return getStateBackend().createOperatorStateBackend(new DummyEnvironment(), "test_op", Collections.emptyList(), new CloseableRegistry());
    }

    private OperatorStateBackend createOperatorStateBackend(Collection<OperatorStateHandle> collection) throws Exception {
        return getStateBackend().createOperatorStateBackend(new DummyEnvironment(), "test_op", collection, new CloseableRegistry());
    }

    private OperatorStateBackend restoreOperatorStateBackend(OperatorStateHandle operatorStateHandle) throws Exception {
        return createOperatorStateBackend(StateObjectCollection.singleton(operatorStateHandle));
    }

    private OperatorStateHandle runSnapshot(RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture) throws Exception {
        if (!runnableFuture.isDone()) {
            runnableFuture.run();
        }
        return runnableFuture.get().getJobManagerOwnedSnapshot();
    }
}
