package org.apache.flink.runtime.state;

import java.util.HashMap;
import java.util.Iterator;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.shaded.com.google.common.base.Joiner;
import org.apache.flink.types.IntValue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase.class */
public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
    protected B backend;

    /* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase$AppendingFold.class */
    private static class AppendingFold implements FoldFunction<Integer, String> {
        private static final long serialVersionUID = 1;

        private AppendingFold() {
        }

        public String fold(String str, Integer num) throws Exception {
            return str + "," + num;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase$AppendingReduce.class */
    private static class AppendingReduce implements ReduceFunction<String> {
        private AppendingReduce() {
        }

        public String reduce(String str, String str2) throws Exception {
            return str + "," + str2;
        }
    }

    protected abstract B getStateBackend() throws Exception;

    protected abstract void cleanup() throws Exception;

    @Before
    public void setup() throws Exception {
        this.backend = getStateBackend();
    }

    @After
    public void teardown() throws Exception {
        this.backend.dispose();
        cleanup();
    }

    @Test
    public void testValueState() throws Exception {
        this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class, (Object) null);
        valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        ValueState partitionedState = this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, valueStateDescriptor);
        this.backend.setCurrentKey(1);
        Assert.assertNull(partitionedState.value());
        partitionedState.update("1");
        this.backend.setCurrentKey(2);
        Assert.assertNull(partitionedState.value());
        partitionedState.update("2");
        this.backend.setCurrentKey(1);
        Assert.assertEquals("1", partitionedState.value());
        HashMap snapshotPartitionedState = this.backend.snapshotPartitionedState(682375462378L, 2L);
        for (String str : snapshotPartitionedState.keySet()) {
            if (snapshotPartitionedState.get(str) instanceof AsynchronousKvStateSnapshot) {
                snapshotPartitionedState.put(str, ((AsynchronousKvStateSnapshot) snapshotPartitionedState.get(str)).materialize());
            }
        }
        this.backend.setCurrentKey(1);
        partitionedState.update("u1");
        this.backend.setCurrentKey(2);
        partitionedState.update("u2");
        this.backend.setCurrentKey(3);
        partitionedState.update("u3");
        HashMap snapshotPartitionedState2 = this.backend.snapshotPartitionedState(682375462379L, 4L);
        for (String str2 : snapshotPartitionedState2.keySet()) {
            if (snapshotPartitionedState2.get(str2) instanceof AsynchronousKvStateSnapshot) {
                snapshotPartitionedState2.put(str2, ((AsynchronousKvStateSnapshot) snapshotPartitionedState2.get(str2)).materialize());
            }
        }
        this.backend.setCurrentKey(1);
        Assert.assertEquals("u1", partitionedState.value());
        this.backend.setCurrentKey(2);
        Assert.assertEquals("u2", partitionedState.value());
        this.backend.setCurrentKey(3);
        Assert.assertEquals("u3", partitionedState.value());
        this.backend.dispose();
        this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
        this.backend.injectKeyValueStateSnapshots(snapshotPartitionedState);
        Iterator it = snapshotPartitionedState.keySet().iterator();
        while (it.hasNext()) {
            ((KvStateSnapshot) snapshotPartitionedState.get((String) it.next())).discardState();
        }
        ValueState partitionedState2 = this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, valueStateDescriptor);
        this.backend.setCurrentKey(1);
        Assert.assertEquals("1", partitionedState2.value());
        this.backend.setCurrentKey(2);
        Assert.assertEquals("2", partitionedState2.value());
        this.backend.dispose();
        this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
        this.backend.injectKeyValueStateSnapshots(snapshotPartitionedState2);
        Iterator it2 = snapshotPartitionedState2.keySet().iterator();
        while (it2.hasNext()) {
            ((KvStateSnapshot) snapshotPartitionedState2.get((String) it2.next())).discardState();
        }
        ValueState partitionedState3 = this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, valueStateDescriptor);
        this.backend.setCurrentKey(1);
        Assert.assertEquals("u1", partitionedState3.value());
        this.backend.setCurrentKey(2);
        Assert.assertEquals("u2", partitionedState3.value());
        this.backend.setCurrentKey(3);
        Assert.assertEquals("u3", partitionedState3.value());
    }

    @Test
    public void testValueStateNullUpdate() throws Exception {
        try {
            LongSerializer.INSTANCE.serialize((Long) null, new DataOutputViewStreamWrapper(new ByteArrayOutputStream()));
            Assert.fail("Should faill with NullPointerException");
        } catch (NullPointerException e) {
        }
        this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", LongSerializer.INSTANCE, 42L);
        valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        ValueState partitionedState = this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, valueStateDescriptor);
        this.backend.setCurrentKey(1);
        Assert.assertEquals(42L, ((Long) partitionedState.value()).longValue());
        partitionedState.update(1L);
        Assert.assertEquals(1L, ((Long) partitionedState.value()).longValue());
        this.backend.setCurrentKey(2);
        Assert.assertEquals(42L, ((Long) partitionedState.value()).longValue());
        this.backend.setCurrentKey(1);
        partitionedState.clear();
        Assert.assertEquals(42L, ((Long) partitionedState.value()).longValue());
        partitionedState.update(17L);
        Assert.assertEquals(17L, ((Long) partitionedState.value()).longValue());
        partitionedState.update((Object) null);
        Assert.assertEquals(42L, ((Long) partitionedState.value()).longValue());
        HashMap snapshotPartitionedState = this.backend.snapshotPartitionedState(682375462378L, 2L);
        for (String str : snapshotPartitionedState.keySet()) {
            if (snapshotPartitionedState.get(str) instanceof AsynchronousKvStateSnapshot) {
                snapshotPartitionedState.put(str, ((AsynchronousKvStateSnapshot) snapshotPartitionedState.get(str)).materialize());
            }
        }
        this.backend.dispose();
        this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
        this.backend.injectKeyValueStateSnapshots(snapshotPartitionedState);
        Iterator it = snapshotPartitionedState.keySet().iterator();
        while (it.hasNext()) {
            ((KvStateSnapshot) snapshotPartitionedState.get((String) it.next())).discardState();
        }
        this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, valueStateDescriptor);
    }

    @Test
    public void testListState() {
        try {
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            ListStateDescriptor listStateDescriptor = new ListStateDescriptor("id", String.class);
            ListState partitionedState = this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, listStateDescriptor);
            Joiner on = Joiner.on(",");
            this.backend.setCurrentKey(1);
            Assert.assertEquals((Object) null, partitionedState.get());
            partitionedState.add("1");
            this.backend.setCurrentKey(2);
            Assert.assertEquals((Object) null, partitionedState.get());
            partitionedState.add("2");
            this.backend.setCurrentKey(1);
            Assert.assertEquals("1", on.join((Iterable) partitionedState.get()));
            HashMap snapshotPartitionedState = this.backend.snapshotPartitionedState(682375462378L, 2L);
            for (String str : snapshotPartitionedState.keySet()) {
                if (snapshotPartitionedState.get(str) instanceof AsynchronousKvStateSnapshot) {
                    snapshotPartitionedState.put(str, ((AsynchronousKvStateSnapshot) snapshotPartitionedState.get(str)).materialize());
                }
            }
            this.backend.setCurrentKey(1);
            partitionedState.add("u1");
            this.backend.setCurrentKey(2);
            partitionedState.add("u2");
            this.backend.setCurrentKey(3);
            partitionedState.add("u3");
            HashMap snapshotPartitionedState2 = this.backend.snapshotPartitionedState(682375462379L, 4L);
            for (String str2 : snapshotPartitionedState2.keySet()) {
                if (snapshotPartitionedState2.get(str2) instanceof AsynchronousKvStateSnapshot) {
                    snapshotPartitionedState2.put(str2, ((AsynchronousKvStateSnapshot) snapshotPartitionedState2.get(str2)).materialize());
                }
            }
            this.backend.setCurrentKey(1);
            Assert.assertEquals("1,u1", on.join((Iterable) partitionedState.get()));
            this.backend.setCurrentKey(2);
            Assert.assertEquals("2,u2", on.join((Iterable) partitionedState.get()));
            this.backend.setCurrentKey(3);
            Assert.assertEquals("u3", on.join((Iterable) partitionedState.get()));
            this.backend.dispose();
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshotPartitionedState);
            Iterator it = snapshotPartitionedState.keySet().iterator();
            while (it.hasNext()) {
                ((KvStateSnapshot) snapshotPartitionedState.get((String) it.next())).discardState();
            }
            ListState partitionedState2 = this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, listStateDescriptor);
            this.backend.setCurrentKey(1);
            Assert.assertEquals("1", on.join((Iterable) partitionedState2.get()));
            this.backend.setCurrentKey(2);
            Assert.assertEquals("2", on.join((Iterable) partitionedState2.get()));
            this.backend.dispose();
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshotPartitionedState2);
            Iterator it2 = snapshotPartitionedState2.keySet().iterator();
            while (it2.hasNext()) {
                ((KvStateSnapshot) snapshotPartitionedState2.get((String) it2.next())).discardState();
            }
            ListState partitionedState3 = this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, listStateDescriptor);
            this.backend.setCurrentKey(1);
            Assert.assertEquals("1,u1", on.join((Iterable) partitionedState3.get()));
            this.backend.setCurrentKey(2);
            Assert.assertEquals("2,u2", on.join((Iterable) partitionedState3.get()));
            this.backend.setCurrentKey(3);
            Assert.assertEquals("u3", on.join((Iterable) partitionedState3.get()));
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testReducingState() {
        try {
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("id", new AppendingReduce(), String.class);
            ReducingState partitionedState = this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, reducingStateDescriptor);
            this.backend.setCurrentKey(1);
            Assert.assertEquals((Object) null, partitionedState.get());
            partitionedState.add("1");
            this.backend.setCurrentKey(2);
            Assert.assertEquals((Object) null, partitionedState.get());
            partitionedState.add("2");
            this.backend.setCurrentKey(1);
            Assert.assertEquals("1", partitionedState.get());
            HashMap snapshotPartitionedState = this.backend.snapshotPartitionedState(682375462378L, 2L);
            for (String str : snapshotPartitionedState.keySet()) {
                if (snapshotPartitionedState.get(str) instanceof AsynchronousKvStateSnapshot) {
                    snapshotPartitionedState.put(str, ((AsynchronousKvStateSnapshot) snapshotPartitionedState.get(str)).materialize());
                }
            }
            this.backend.setCurrentKey(1);
            partitionedState.add("u1");
            this.backend.setCurrentKey(2);
            partitionedState.add("u2");
            this.backend.setCurrentKey(3);
            partitionedState.add("u3");
            HashMap snapshotPartitionedState2 = this.backend.snapshotPartitionedState(682375462379L, 4L);
            for (String str2 : snapshotPartitionedState2.keySet()) {
                if (snapshotPartitionedState2.get(str2) instanceof AsynchronousKvStateSnapshot) {
                    snapshotPartitionedState2.put(str2, ((AsynchronousKvStateSnapshot) snapshotPartitionedState2.get(str2)).materialize());
                }
            }
            this.backend.setCurrentKey(1);
            Assert.assertEquals("1,u1", partitionedState.get());
            this.backend.setCurrentKey(2);
            Assert.assertEquals("2,u2", partitionedState.get());
            this.backend.setCurrentKey(3);
            Assert.assertEquals("u3", partitionedState.get());
            this.backend.dispose();
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshotPartitionedState);
            Iterator it = snapshotPartitionedState.keySet().iterator();
            while (it.hasNext()) {
                ((KvStateSnapshot) snapshotPartitionedState.get((String) it.next())).discardState();
            }
            ReducingState partitionedState2 = this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, reducingStateDescriptor);
            this.backend.setCurrentKey(1);
            Assert.assertEquals("1", partitionedState2.get());
            this.backend.setCurrentKey(2);
            Assert.assertEquals("2", partitionedState2.get());
            this.backend.dispose();
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshotPartitionedState2);
            Iterator it2 = snapshotPartitionedState2.keySet().iterator();
            while (it2.hasNext()) {
                ((KvStateSnapshot) snapshotPartitionedState2.get((String) it2.next())).discardState();
            }
            ReducingState partitionedState3 = this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, reducingStateDescriptor);
            this.backend.setCurrentKey(1);
            Assert.assertEquals("1,u1", partitionedState3.get());
            this.backend.setCurrentKey(2);
            Assert.assertEquals("2,u2", partitionedState3.get());
            this.backend.setCurrentKey(3);
            Assert.assertEquals("u3", partitionedState3.get());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testFoldingState() {
        try {
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            FoldingStateDescriptor foldingStateDescriptor = new FoldingStateDescriptor("id", "Fold-Initial:", new AppendingFold(), String.class);
            FoldingState partitionedState = this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, foldingStateDescriptor);
            this.backend.setCurrentKey(1);
            Assert.assertEquals((Object) null, partitionedState.get());
            partitionedState.add(1);
            this.backend.setCurrentKey(2);
            Assert.assertEquals((Object) null, partitionedState.get());
            partitionedState.add(2);
            this.backend.setCurrentKey(1);
            Assert.assertEquals("Fold-Initial:,1", partitionedState.get());
            HashMap snapshotPartitionedState = this.backend.snapshotPartitionedState(682375462378L, 2L);
            for (String str : snapshotPartitionedState.keySet()) {
                if (snapshotPartitionedState.get(str) instanceof AsynchronousKvStateSnapshot) {
                    snapshotPartitionedState.put(str, ((AsynchronousKvStateSnapshot) snapshotPartitionedState.get(str)).materialize());
                }
            }
            this.backend.setCurrentKey(1);
            partitionedState.clear();
            partitionedState.add(101);
            this.backend.setCurrentKey(2);
            partitionedState.add(102);
            this.backend.setCurrentKey(3);
            partitionedState.add(103);
            HashMap snapshotPartitionedState2 = this.backend.snapshotPartitionedState(682375462379L, 4L);
            for (String str2 : snapshotPartitionedState2.keySet()) {
                if (snapshotPartitionedState2.get(str2) instanceof AsynchronousKvStateSnapshot) {
                    snapshotPartitionedState2.put(str2, ((AsynchronousKvStateSnapshot) snapshotPartitionedState2.get(str2)).materialize());
                }
            }
            this.backend.setCurrentKey(1);
            Assert.assertEquals("Fold-Initial:,101", partitionedState.get());
            this.backend.setCurrentKey(2);
            Assert.assertEquals("Fold-Initial:,2,102", partitionedState.get());
            this.backend.setCurrentKey(3);
            Assert.assertEquals("Fold-Initial:,103", partitionedState.get());
            this.backend.dispose();
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshotPartitionedState);
            Iterator it = snapshotPartitionedState.keySet().iterator();
            while (it.hasNext()) {
                ((KvStateSnapshot) snapshotPartitionedState.get((String) it.next())).discardState();
            }
            FoldingState partitionedState2 = this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, foldingStateDescriptor);
            this.backend.setCurrentKey(1);
            Assert.assertEquals("Fold-Initial:,1", partitionedState2.get());
            this.backend.setCurrentKey(2);
            Assert.assertEquals("Fold-Initial:,2", partitionedState2.get());
            this.backend.dispose();
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshotPartitionedState2);
            Iterator it2 = snapshotPartitionedState2.keySet().iterator();
            while (it2.hasNext()) {
                ((KvStateSnapshot) snapshotPartitionedState2.get((String) it2.next())).discardState();
            }
            FoldingState partitionedState3 = this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, foldingStateDescriptor);
            this.backend.setCurrentKey(1);
            Assert.assertEquals("Fold-Initial:,101", partitionedState3.get());
            this.backend.setCurrentKey(2);
            Assert.assertEquals("Fold-Initial:,2,102", partitionedState3.get());
            this.backend.setCurrentKey(3);
            Assert.assertEquals("Fold-Initial:,103", partitionedState3.get());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testValueStateRestoreWithWrongSerializers() {
        try {
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class, (Object) null);
            valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
            ValueState partitionedState = this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, valueStateDescriptor);
            this.backend.setCurrentKey(1);
            partitionedState.update("1");
            this.backend.setCurrentKey(2);
            partitionedState.update("2");
            HashMap snapshotPartitionedState = this.backend.snapshotPartitionedState(682375462378L, 2L);
            for (String str : snapshotPartitionedState.keySet()) {
                if (snapshotPartitionedState.get(str) instanceof AsynchronousKvStateSnapshot) {
                    snapshotPartitionedState.put(str, ((AsynchronousKvStateSnapshot) snapshotPartitionedState.get(str)).materialize());
                }
            }
            this.backend.dispose();
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshotPartitionedState);
            Iterator it = snapshotPartitionedState.keySet().iterator();
            while (it.hasNext()) {
                ((KvStateSnapshot) snapshotPartitionedState.get((String) it.next())).discardState();
            }
            try {
                this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, new ValueStateDescriptor("id", FloatSerializer.INSTANCE, (Object) null)).value();
                Assert.fail("should recognize wrong serializers");
            } catch (RuntimeException e) {
                if (!e.getMessage().contains("Trying to access state using wrong StateDescriptor")) {
                    Assert.fail("wrong exception " + e);
                }
            } catch (Exception e2) {
                Assert.fail("wrong exception " + e2);
            }
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testListStateRestoreWithWrongSerializers() {
        try {
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            ListState partitionedState = this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, new ListStateDescriptor("id", String.class));
            this.backend.setCurrentKey(1);
            partitionedState.add("1");
            this.backend.setCurrentKey(2);
            partitionedState.add("2");
            HashMap snapshotPartitionedState = this.backend.snapshotPartitionedState(682375462378L, 2L);
            for (String str : snapshotPartitionedState.keySet()) {
                if (snapshotPartitionedState.get(str) instanceof AsynchronousKvStateSnapshot) {
                    snapshotPartitionedState.put(str, ((AsynchronousKvStateSnapshot) snapshotPartitionedState.get(str)).materialize());
                }
            }
            this.backend.dispose();
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshotPartitionedState);
            Iterator it = snapshotPartitionedState.keySet().iterator();
            while (it.hasNext()) {
                ((KvStateSnapshot) snapshotPartitionedState.get((String) it.next())).discardState();
            }
            try {
                this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, new ListStateDescriptor("id", FloatSerializer.INSTANCE)).get();
                Assert.fail("should recognize wrong serializers");
            } catch (RuntimeException e) {
                if (!e.getMessage().contains("Trying to access state using wrong StateDescriptor")) {
                    Assert.fail("wrong exception " + e);
                }
            } catch (Exception e2) {
                Assert.fail("wrong exception " + e2);
            }
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testReducingStateRestoreWithWrongSerializers() {
        try {
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            ReducingState partitionedState = this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, new ReducingStateDescriptor("id", new AppendingReduce(), StringSerializer.INSTANCE));
            this.backend.setCurrentKey(1);
            partitionedState.add("1");
            this.backend.setCurrentKey(2);
            partitionedState.add("2");
            HashMap snapshotPartitionedState = this.backend.snapshotPartitionedState(682375462378L, 2L);
            for (String str : snapshotPartitionedState.keySet()) {
                if (snapshotPartitionedState.get(str) instanceof AsynchronousKvStateSnapshot) {
                    snapshotPartitionedState.put(str, ((AsynchronousKvStateSnapshot) snapshotPartitionedState.get(str)).materialize());
                }
            }
            this.backend.dispose();
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            this.backend.injectKeyValueStateSnapshots(snapshotPartitionedState);
            Iterator it = snapshotPartitionedState.keySet().iterator();
            while (it.hasNext()) {
                ((KvStateSnapshot) snapshotPartitionedState.get((String) it.next())).discardState();
            }
            try {
                this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, new ReducingStateDescriptor("id", new AppendingReduce(), FloatSerializer.INSTANCE)).get();
                Assert.fail("should recognize wrong serializers");
            } catch (RuntimeException e) {
                if (!e.getMessage().contains("Trying to access state using wrong StateDescriptor")) {
                    Assert.fail("wrong exception " + e);
                }
            } catch (Exception e2) {
                Assert.fail("wrong exception " + e2);
            }
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testCopyDefaultValue() throws Exception {
        this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", IntValue.class, new IntValue(-1));
        valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        ValueState partitionedState = this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, valueStateDescriptor);
        this.backend.setCurrentKey(1);
        IntValue intValue = (IntValue) partitionedState.value();
        this.backend.setCurrentKey(2);
        IntValue intValue2 = (IntValue) partitionedState.value();
        Assert.assertNotNull(intValue);
        Assert.assertNotNull(intValue2);
        Assert.assertEquals(intValue, intValue2);
        Assert.assertFalse(intValue == intValue2);
    }
}
