package org.apache.flink.runtime.state;

import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.shaded.com.google.common.base.Joiner;
import org.apache.flink.types.IntValue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase.class */
public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
    protected B backend;

    protected abstract B getStateBackend() throws Exception;

    protected abstract void cleanup() throws Exception;

    @Before
    public void setup() throws Exception {
        this.backend = getStateBackend();
    }

    @After
    public void teardown() throws Exception {
        this.backend.dispose();
        cleanup();
    }

    @Test
    public void testValueState() throws Exception {
        this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class, (Object) null);
        valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        KvState kvState = (ValueState) this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, valueStateDescriptor);
        KvState kvState2 = kvState;
        kvState2.setCurrentKey(1);
        Assert.assertNull(kvState.value());
        kvState.update("1");
        kvState2.setCurrentKey(2);
        Assert.assertNull(kvState.value());
        kvState.update("2");
        kvState2.setCurrentKey(1);
        Assert.assertEquals("1", kvState.value());
        KvStateSnapshot snapshot = kvState2.snapshot(682375462378L, 2L);
        if (snapshot instanceof AsynchronousKvStateSnapshot) {
            snapshot = ((AsynchronousKvStateSnapshot) snapshot).materialize();
        }
        kvState2.setCurrentKey(1);
        kvState.update("u1");
        kvState2.setCurrentKey(2);
        kvState.update("u2");
        kvState2.setCurrentKey(3);
        kvState.update("u3");
        KvStateSnapshot snapshot2 = kvState2.snapshot(682375462379L, 4L);
        if (snapshot2 instanceof AsynchronousKvStateSnapshot) {
            snapshot2 = ((AsynchronousKvStateSnapshot) snapshot2).materialize();
        }
        kvState2.setCurrentKey(1);
        Assert.assertEquals("u1", kvState.value());
        kvState2.setCurrentKey(2);
        Assert.assertEquals("u2", kvState.value());
        kvState2.setCurrentKey(3);
        Assert.assertEquals("u3", kvState.value());
        kvState2.dispose();
        ValueState restoreState = snapshot.restoreState(this.backend, IntSerializer.INSTANCE, getClass().getClassLoader(), 10L);
        snapshot.discardState();
        ValueState valueState = restoreState;
        restoreState.setCurrentKey(1);
        Assert.assertEquals("1", valueState.value());
        restoreState.setCurrentKey(2);
        Assert.assertEquals("2", valueState.value());
        restoreState.dispose();
        ValueState restoreState2 = snapshot2.restoreState(this.backend, IntSerializer.INSTANCE, getClass().getClassLoader(), 10L);
        snapshot2.discardState();
        ValueState valueState2 = restoreState2;
        restoreState2.setCurrentKey(1);
        Assert.assertEquals("u1", valueState2.value());
        restoreState2.setCurrentKey(2);
        Assert.assertEquals("u2", valueState2.value());
        restoreState2.setCurrentKey(3);
        Assert.assertEquals("u3", valueState2.value());
    }

    @Test
    public void testValueStateNullUpdate() throws Exception {
        try {
            LongSerializer.INSTANCE.serialize((Long) null, new DataOutputViewStreamWrapper(new ByteArrayOutputStream()));
            Assert.fail("Should faill with NullPointerException");
        } catch (NullPointerException e) {
        }
        this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", LongSerializer.INSTANCE, 42L);
        valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        KvState kvState = (ValueState) this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, valueStateDescriptor);
        KvState kvState2 = kvState;
        kvState2.setCurrentKey(1);
        Assert.assertEquals(42L, ((Long) kvState.value()).longValue());
        kvState.update(1L);
        Assert.assertEquals(1L, ((Long) kvState.value()).longValue());
        kvState2.setCurrentKey(2);
        Assert.assertEquals(42L, ((Long) kvState.value()).longValue());
        kvState2.setCurrentKey(1);
        kvState.clear();
        Assert.assertEquals(42L, ((Long) kvState.value()).longValue());
        kvState.update(17L);
        Assert.assertEquals(17L, ((Long) kvState.value()).longValue());
        kvState.update((Object) null);
        Assert.assertEquals(42L, ((Long) kvState.value()).longValue());
        KvStateSnapshot snapshot = kvState2.snapshot(682375462378L, 2L);
        if (snapshot instanceof AsynchronousKvStateSnapshot) {
            snapshot = ((AsynchronousKvStateSnapshot) snapshot).materialize();
        }
        kvState2.dispose();
        snapshot.restoreState(this.backend, IntSerializer.INSTANCE, getClass().getClassLoader(), 10L);
    }

    @Test
    public void testListState() {
        try {
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            KvState kvState = (ListState) this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, new ListStateDescriptor("id", String.class));
            KvState kvState2 = kvState;
            Joiner on = Joiner.on(",");
            kvState2.setCurrentKey(1);
            Assert.assertEquals("", on.join((Iterable) kvState.get()));
            kvState.add("1");
            kvState2.setCurrentKey(2);
            Assert.assertEquals("", on.join((Iterable) kvState.get()));
            kvState.add("2");
            kvState2.setCurrentKey(1);
            Assert.assertEquals("1", on.join((Iterable) kvState.get()));
            KvStateSnapshot snapshot = kvState2.snapshot(682375462378L, 2L);
            if (snapshot instanceof AsynchronousKvStateSnapshot) {
                snapshot = ((AsynchronousKvStateSnapshot) snapshot).materialize();
            }
            kvState2.setCurrentKey(1);
            kvState.add("u1");
            kvState2.setCurrentKey(2);
            kvState.add("u2");
            kvState2.setCurrentKey(3);
            kvState.add("u3");
            KvStateSnapshot snapshot2 = kvState2.snapshot(682375462379L, 4L);
            if (snapshot2 instanceof AsynchronousKvStateSnapshot) {
                snapshot2 = ((AsynchronousKvStateSnapshot) snapshot2).materialize();
            }
            kvState2.setCurrentKey(1);
            Assert.assertEquals("1,u1", on.join((Iterable) kvState.get()));
            kvState2.setCurrentKey(2);
            Assert.assertEquals("2,u2", on.join((Iterable) kvState.get()));
            kvState2.setCurrentKey(3);
            Assert.assertEquals("u3", on.join((Iterable) kvState.get()));
            kvState2.dispose();
            ListState restoreState = snapshot.restoreState(this.backend, IntSerializer.INSTANCE, getClass().getClassLoader(), 10L);
            snapshot.discardState();
            ListState listState = restoreState;
            restoreState.setCurrentKey(1);
            Assert.assertEquals("1", on.join((Iterable) listState.get()));
            restoreState.setCurrentKey(2);
            Assert.assertEquals("2", on.join((Iterable) listState.get()));
            restoreState.dispose();
            ListState restoreState2 = snapshot2.restoreState(this.backend, IntSerializer.INSTANCE, getClass().getClassLoader(), 20L);
            snapshot2.discardState();
            ListState listState2 = restoreState2;
            restoreState2.setCurrentKey(1);
            Assert.assertEquals("1,u1", on.join((Iterable) listState2.get()));
            restoreState2.setCurrentKey(2);
            Assert.assertEquals("2,u2", on.join((Iterable) listState2.get()));
            restoreState2.setCurrentKey(3);
            Assert.assertEquals("u3", on.join((Iterable) listState2.get()));
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testReducingState() {
        try {
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            KvState kvState = (ReducingState) this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, new ReducingStateDescriptor("id", new ReduceFunction<String>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.1
                private static final long serialVersionUID = 1;

                public String reduce(String str, String str2) throws Exception {
                    return str + "," + str2;
                }
            }, String.class));
            KvState kvState2 = kvState;
            Joiner.on(",");
            kvState2.setCurrentKey(1);
            Assert.assertEquals((Object) null, kvState.get());
            kvState.add("1");
            kvState2.setCurrentKey(2);
            Assert.assertEquals((Object) null, kvState.get());
            kvState.add("2");
            kvState2.setCurrentKey(1);
            Assert.assertEquals("1", kvState.get());
            KvStateSnapshot snapshot = kvState2.snapshot(682375462378L, 2L);
            if (snapshot instanceof AsynchronousKvStateSnapshot) {
                snapshot = ((AsynchronousKvStateSnapshot) snapshot).materialize();
            }
            kvState2.setCurrentKey(1);
            kvState.add("u1");
            kvState2.setCurrentKey(2);
            kvState.add("u2");
            kvState2.setCurrentKey(3);
            kvState.add("u3");
            KvStateSnapshot snapshot2 = kvState2.snapshot(682375462379L, 4L);
            if (snapshot2 instanceof AsynchronousKvStateSnapshot) {
                snapshot2 = ((AsynchronousKvStateSnapshot) snapshot2).materialize();
            }
            kvState2.setCurrentKey(1);
            Assert.assertEquals("1,u1", kvState.get());
            kvState2.setCurrentKey(2);
            Assert.assertEquals("2,u2", kvState.get());
            kvState2.setCurrentKey(3);
            Assert.assertEquals("u3", kvState.get());
            kvState2.dispose();
            ReducingState restoreState = snapshot.restoreState(this.backend, IntSerializer.INSTANCE, getClass().getClassLoader(), 10L);
            snapshot.discardState();
            ReducingState reducingState = restoreState;
            restoreState.setCurrentKey(1);
            Assert.assertEquals("1", reducingState.get());
            restoreState.setCurrentKey(2);
            Assert.assertEquals("2", reducingState.get());
            restoreState.dispose();
            ReducingState restoreState2 = snapshot2.restoreState(this.backend, IntSerializer.INSTANCE, getClass().getClassLoader(), 20L);
            snapshot2.discardState();
            ReducingState reducingState2 = restoreState2;
            restoreState2.setCurrentKey(1);
            Assert.assertEquals("1,u1", reducingState2.get());
            restoreState2.setCurrentKey(2);
            Assert.assertEquals("2,u2", reducingState2.get());
            restoreState2.setCurrentKey(3);
            Assert.assertEquals("u3", reducingState2.get());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testFoldingState() {
        try {
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            KvState kvState = (FoldingState) this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, new FoldingStateDescriptor("id", "Fold-Initial:", new FoldFunction<Integer, String>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.2
                private static final long serialVersionUID = 1;

                public String fold(String str, Integer num) throws Exception {
                    return str + "," + num;
                }
            }, String.class));
            KvState kvState2 = kvState;
            Joiner.on(",");
            kvState2.setCurrentKey(1);
            Assert.assertEquals("Fold-Initial:", kvState.get());
            kvState.add(1);
            kvState2.setCurrentKey(2);
            Assert.assertEquals("Fold-Initial:", kvState.get());
            kvState.add(2);
            kvState2.setCurrentKey(1);
            Assert.assertEquals("Fold-Initial:,1", kvState.get());
            KvStateSnapshot snapshot = kvState2.snapshot(682375462378L, 2L);
            if (snapshot instanceof AsynchronousKvStateSnapshot) {
                snapshot = ((AsynchronousKvStateSnapshot) snapshot).materialize();
            }
            kvState2.setCurrentKey(1);
            kvState.clear();
            kvState.add(101);
            kvState2.setCurrentKey(2);
            kvState.add(102);
            kvState2.setCurrentKey(3);
            kvState.add(103);
            KvStateSnapshot snapshot2 = kvState2.snapshot(682375462379L, 4L);
            if (snapshot2 instanceof AsynchronousKvStateSnapshot) {
                snapshot2 = ((AsynchronousKvStateSnapshot) snapshot2).materialize();
            }
            kvState2.setCurrentKey(1);
            Assert.assertEquals("Fold-Initial:,101", kvState.get());
            kvState2.setCurrentKey(2);
            Assert.assertEquals("Fold-Initial:,2,102", kvState.get());
            kvState2.setCurrentKey(3);
            Assert.assertEquals("Fold-Initial:,103", kvState.get());
            kvState2.dispose();
            FoldingState restoreState = snapshot.restoreState(this.backend, IntSerializer.INSTANCE, getClass().getClassLoader(), 10L);
            snapshot.discardState();
            FoldingState foldingState = restoreState;
            restoreState.setCurrentKey(1);
            Assert.assertEquals("Fold-Initial:,1", foldingState.get());
            restoreState.setCurrentKey(2);
            Assert.assertEquals("Fold-Initial:,2", foldingState.get());
            restoreState.dispose();
            FoldingState restoreState2 = snapshot2.restoreState(this.backend, IntSerializer.INSTANCE, getClass().getClassLoader(), 20L);
            snapshot2.discardState();
            FoldingState foldingState2 = restoreState2;
            restoreState2.setCurrentKey(1);
            Assert.assertEquals("Fold-Initial:,101", foldingState2.get());
            restoreState2.setCurrentKey(2);
            Assert.assertEquals("Fold-Initial:,2,102", foldingState2.get());
            restoreState2.setCurrentKey(3);
            Assert.assertEquals("Fold-Initial:,103", foldingState2.get());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testValueStateRestoreWithWrongSerializers() {
        try {
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class, (Object) null);
            valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
            KvState kvState = (ValueState) this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, valueStateDescriptor);
            KvState kvState2 = kvState;
            kvState2.setCurrentKey(1);
            kvState.update("1");
            kvState2.setCurrentKey(2);
            kvState.update("2");
            KvStateSnapshot snapshot = kvState2.snapshot(682375462378L, System.currentTimeMillis());
            if (snapshot instanceof AsynchronousKvStateSnapshot) {
                snapshot = ((AsynchronousKvStateSnapshot) snapshot).materialize();
            }
            try {
                snapshot.restoreState(this.backend, FloatSerializer.INSTANCE, getClass().getClassLoader(), 1L);
                Assert.fail("should recognize wrong serializers");
            } catch (IllegalArgumentException e) {
            } catch (Exception e2) {
                Assert.fail("wrong exception");
            }
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testListStateRestoreWithWrongSerializers() {
        try {
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            KvState kvState = (ListState) this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, new ListStateDescriptor("id", String.class));
            KvState kvState2 = kvState;
            kvState2.setCurrentKey(1);
            kvState.add("1");
            kvState2.setCurrentKey(2);
            kvState.add("2");
            KvStateSnapshot snapshot = kvState2.snapshot(682375462378L, System.currentTimeMillis());
            if (snapshot instanceof AsynchronousKvStateSnapshot) {
                snapshot = ((AsynchronousKvStateSnapshot) snapshot).materialize();
            }
            kvState2.dispose();
            try {
                snapshot.restoreState(this.backend, FloatSerializer.INSTANCE, getClass().getClassLoader(), 1L);
                Assert.fail("should recognize wrong serializers");
            } catch (IllegalArgumentException e) {
            } catch (Exception e2) {
                Assert.fail("wrong exception " + e2);
            }
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testReducingStateRestoreWithWrongSerializers() {
        try {
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            KvState kvState = (ReducingState) this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, new ReducingStateDescriptor("id", new ReduceFunction<String>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.3
                public String reduce(String str, String str2) throws Exception {
                    return str + "," + str2;
                }
            }, String.class));
            KvState kvState2 = kvState;
            kvState2.setCurrentKey(1);
            kvState.add("1");
            kvState2.setCurrentKey(2);
            kvState.add("2");
            KvStateSnapshot snapshot = kvState2.snapshot(682375462378L, System.currentTimeMillis());
            if (snapshot instanceof AsynchronousKvStateSnapshot) {
                snapshot = ((AsynchronousKvStateSnapshot) snapshot).materialize();
            }
            kvState2.dispose();
            try {
                snapshot.restoreState(this.backend, FloatSerializer.INSTANCE, getClass().getClassLoader(), 1L);
                Assert.fail("should recognize wrong serializers");
            } catch (IllegalArgumentException e) {
            } catch (Exception e2) {
                Assert.fail("wrong exception " + e2);
            }
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testCopyDefaultValue() {
        try {
            this.backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", IntValue.class, new IntValue(-1));
            valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
            KvState kvState = (ValueState) this.backend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, valueStateDescriptor);
            KvState kvState2 = kvState;
            kvState2.setCurrentKey(1);
            IntValue intValue = (IntValue) kvState.value();
            kvState2.setCurrentKey(2);
            IntValue intValue2 = (IntValue) kvState.value();
            Assert.assertNotNull(intValue);
            Assert.assertNotNull(intValue2);
            Assert.assertEquals(intValue, intValue2);
            Assert.assertFalse(intValue == intValue2);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
