package org.apache.flink.runtime.state;

import com.esotericsoftware.kryo.serializers.JavaSerializer;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/state/OperatorStateBackendTest.class */
public class OperatorStateBackendTest {
    private final ClassLoader classLoader = getClass().getClassLoader();

    /* loaded from: input_file:org/apache/flink/runtime/state/OperatorStateBackendTest$MutableType.class */
    static final class MutableType implements Serializable {
        private static final long serialVersionUID = 1;
        private int value;

        public MutableType() {
            this(0);
        }

        public MutableType(int i) {
            this.value = i;
        }

        public int getValue() {
            return this.value;
        }

        public void setValue(int i) {
            this.value = i;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.value == ((MutableType) obj).value;
        }

        public int hashCode() {
            return this.value;
        }

        static MutableType of(int i) {
            return new MutableType(i);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/OperatorStateBackendTest$VerifyingIntSerializer.class */
    private static final class VerifyingIntSerializer extends TypeSerializer<Integer> {
        private static final long serialVersionUID = -5344563614550163898L;
        private transient ClassLoader classLoader;
        private transient AtomicInteger atomicInteger;

        private VerifyingIntSerializer(ClassLoader classLoader, AtomicInteger atomicInteger) {
            this.classLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
            this.atomicInteger = (AtomicInteger) Preconditions.checkNotNull(atomicInteger);
        }

        public boolean isImmutableType() {
            return false;
        }

        public TypeSerializer<Integer> duplicate() {
            return this;
        }

        /* renamed from: createInstance, reason: merged with bridge method [inline-methods] */
        public Integer m378createInstance() {
            return 0;
        }

        public Integer copy(Integer num) {
            Assert.assertEquals(this.classLoader, Thread.currentThread().getContextClassLoader());
            this.atomicInteger.incrementAndGet();
            return IntSerializer.INSTANCE.copy(num);
        }

        public Integer copy(Integer num, Integer num2) {
            Assert.assertEquals(this.classLoader, Thread.currentThread().getContextClassLoader());
            this.atomicInteger.incrementAndGet();
            return IntSerializer.INSTANCE.copy(num, num2);
        }

        public int getLength() {
            return IntSerializer.INSTANCE.getLength();
        }

        public void serialize(Integer num, DataOutputView dataOutputView) throws IOException {
            IntSerializer.INSTANCE.serialize(num, dataOutputView);
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Integer m377deserialize(DataInputView dataInputView) throws IOException {
            return IntSerializer.INSTANCE.deserialize(dataInputView);
        }

        public Integer deserialize(Integer num, DataInputView dataInputView) throws IOException {
            return IntSerializer.INSTANCE.deserialize(num, dataInputView);
        }

        public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
            Assert.assertEquals(this.classLoader, Thread.currentThread().getContextClassLoader());
            this.atomicInteger.incrementAndGet();
            IntSerializer.INSTANCE.copy(dataInputView, dataOutputView);
        }

        public boolean equals(Object obj) {
            if (obj instanceof VerifyingIntSerializer) {
                return ((VerifyingIntSerializer) obj).canEqual(this);
            }
            return false;
        }

        public boolean canEqual(Object obj) {
            return obj instanceof VerifyingIntSerializer;
        }

        public int hashCode() {
            return getClass().hashCode();
        }

        public TypeSerializerConfigSnapshot snapshotConfiguration() {
            return IntSerializer.INSTANCE.snapshotConfiguration();
        }

        public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot typeSerializerConfigSnapshot) {
            return IntSerializer.INSTANCE.ensureCompatibility(typeSerializerConfigSnapshot);
        }
    }

    @Test
    public void testCreateOnAbstractStateBackend() throws Exception {
        OperatorStateBackend createOperatorStateBackend = new MemoryStateBackend().createOperatorStateBackend(createMockEnvironment(), "test-operator");
        Assert.assertNotNull(createOperatorStateBackend);
        Assert.assertTrue(createOperatorStateBackend.getRegisteredStateNames().isEmpty());
        Assert.assertTrue(createOperatorStateBackend.getRegisteredBroadcastStateNames().isEmpty());
    }

    @Test
    public void testRegisterStatesWithoutTypeSerializer() throws Exception {
        Assert.assertFalse(new KryoSerializer(File.class, new ExecutionConfig()).getKryo().getDefaultSerializer(FutureTask.class) instanceof JavaSerializer);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.registerTypeWithKryoSerializer(FutureTask.class, JavaSerializer.class);
        DefaultOperatorStateBackend defaultOperatorStateBackend = new DefaultOperatorStateBackend(this.classLoader, executionConfig, false);
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("test", File.class);
        ListStateDescriptor listStateDescriptor2 = new ListStateDescriptor("test2", String.class);
        DefaultOperatorStateBackend.PartitionableListState listState = defaultOperatorStateBackend.getListState(listStateDescriptor);
        Assert.assertNotNull(listState);
        ListState listState2 = defaultOperatorStateBackend.getListState(listStateDescriptor2);
        Assert.assertNotNull(listState2);
        Assert.assertEquals(2L, defaultOperatorStateBackend.getRegisteredStateNames().size());
        KryoSerializer partitionStateSerializer = listState.getStateMetaInfo().getPartitionStateSerializer();
        Assert.assertTrue(partitionStateSerializer instanceof KryoSerializer);
        Assert.assertTrue(partitionStateSerializer.getKryo().getSerializer(FutureTask.class) instanceof JavaSerializer);
        Assert.assertFalse(((Iterable) listState2.get()).iterator().hasNext());
        listState2.add("kevin");
        listState2.add("sunny");
        Iterator it = ((Iterable) listState2.get()).iterator();
        Assert.assertEquals("kevin", it.next());
        Assert.assertEquals("sunny", it.next());
        Assert.assertFalse(it.hasNext());
    }

    @Test
    public void testRegisterStates() throws Exception {
        DefaultOperatorStateBackend defaultOperatorStateBackend = new DefaultOperatorStateBackend(this.classLoader, new ExecutionConfig(), false);
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("test1", new JavaSerializer());
        ListStateDescriptor listStateDescriptor2 = new ListStateDescriptor("test2", new JavaSerializer());
        ListStateDescriptor listStateDescriptor3 = new ListStateDescriptor("test3", new JavaSerializer());
        ListState listState = defaultOperatorStateBackend.getListState(listStateDescriptor);
        Assert.assertNotNull(listState);
        Assert.assertEquals(1L, defaultOperatorStateBackend.getRegisteredStateNames().size());
        Assert.assertFalse(((Iterable) listState.get()).iterator().hasNext());
        listState.add(42);
        listState.add(4711);
        Iterator it = ((Iterable) listState.get()).iterator();
        Assert.assertEquals(42, it.next());
        Assert.assertEquals(4711, it.next());
        Assert.assertFalse(it.hasNext());
        ListState listState2 = defaultOperatorStateBackend.getListState(listStateDescriptor2);
        Assert.assertNotNull(listState2);
        Assert.assertEquals(2L, defaultOperatorStateBackend.getRegisteredStateNames().size());
        Assert.assertFalse(it.hasNext());
        listState2.add(7);
        listState2.add(13);
        listState2.add(23);
        Iterator it2 = ((Iterable) listState2.get()).iterator();
        Assert.assertEquals(7, it2.next());
        Assert.assertEquals(13, it2.next());
        Assert.assertEquals(23, it2.next());
        Assert.assertFalse(it2.hasNext());
        ListState unionListState = defaultOperatorStateBackend.getUnionListState(listStateDescriptor3);
        Assert.assertNotNull(unionListState);
        Assert.assertEquals(3L, defaultOperatorStateBackend.getRegisteredStateNames().size());
        Assert.assertFalse(it2.hasNext());
        unionListState.add(17);
        unionListState.add(3);
        unionListState.add(123);
        Iterator it3 = ((Iterable) unionListState.get()).iterator();
        Assert.assertEquals(17, it3.next());
        Assert.assertEquals(3, it3.next());
        Assert.assertEquals(123, it3.next());
        Assert.assertFalse(it3.hasNext());
        ListState listState3 = defaultOperatorStateBackend.getListState(listStateDescriptor);
        Assert.assertNotNull(listState3);
        listState3.add(123);
        Iterator it4 = ((Iterable) listState3.get()).iterator();
        Assert.assertEquals(42, it4.next());
        Assert.assertEquals(4711, it4.next());
        Assert.assertEquals(123, it4.next());
        Assert.assertFalse(it4.hasNext());
        Iterator it5 = ((Iterable) listState.get()).iterator();
        Assert.assertEquals(42, it5.next());
        Assert.assertEquals(4711, it5.next());
        Assert.assertEquals(123, it5.next());
        Assert.assertFalse(it5.hasNext());
        Iterator it6 = ((Iterable) listState3.get()).iterator();
        Assert.assertEquals(42, it6.next());
        Assert.assertEquals(4711, it6.next());
        Assert.assertEquals(123, it6.next());
        Assert.assertFalse(it6.hasNext());
        try {
            defaultOperatorStateBackend.getUnionListState(listStateDescriptor2);
            Assert.fail("Did not detect changed mode");
        } catch (IllegalStateException e) {
        }
        try {
            defaultOperatorStateBackend.getListState(listStateDescriptor3);
            Assert.fail("Did not detect changed mode");
        } catch (IllegalStateException e2) {
        }
    }

    @Test
    public void testCorrectClassLoaderUsedOnSnapshot() throws Exception {
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend(4096);
        Environment createMockEnvironment = createMockEnvironment();
        OperatorStateBackend createOperatorStateBackend = memoryStateBackend.createOperatorStateBackend(createMockEnvironment, "test-op-name");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        createOperatorStateBackend.getListState(new ListStateDescriptor("test", new VerifyingIntSerializer(createMockEnvironment.getUserClassLoader(), atomicInteger))).add(42);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicInteger atomicInteger3 = new AtomicInteger(0);
        BroadcastState broadcastState = createOperatorStateBackend.getBroadcastState(new MapStateDescriptor("test-broadcast", new VerifyingIntSerializer(createMockEnvironment.getUserClassLoader(), atomicInteger2), new VerifyingIntSerializer(createMockEnvironment.getUserClassLoader(), atomicInteger3)));
        broadcastState.put(1, 2);
        broadcastState.put(3, 4);
        broadcastState.put(5, 6);
        FutureUtil.runIfNotDoneAndGet(createOperatorStateBackend.snapshot(1L, 1L, new MemCheckpointStreamFactory(4096), CheckpointOptions.forCheckpointWithDefaultLocation()));
        Assert.assertTrue(atomicInteger.get() > 0);
        Assert.assertTrue(atomicInteger2.get() > 0);
        Assert.assertTrue(atomicInteger3.get() > 0);
    }

    @Test
    public void testSnapshotEmpty() throws Exception {
        Assert.assertNull(((SnapshotResult) FutureUtil.runIfNotDoneAndGet(new MemoryStateBackend(4096).createOperatorStateBackend(createMockEnvironment(), "testOperator").snapshot(0L, 0L, new MemCheckpointStreamFactory(4096), CheckpointOptions.forCheckpointWithDefaultLocation()))).getJobManagerOwnedSnapshot());
    }

    @Test
    public void testSnapshotBroadcastStateWithEmptyOperatorState() throws Exception {
        OperatorStateBackend createOperatorStateBackend = new MemoryStateBackend(4096).createOperatorStateBackend(createMockEnvironment(), "testOperator");
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test-broadcast", BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        HashMap hashMap = new HashMap(3);
        hashMap.put(1, 2);
        hashMap.put(3, 4);
        hashMap.put(5, 6);
        BroadcastState broadcastState = createOperatorStateBackend.getBroadcastState(mapStateDescriptor);
        broadcastState.putAll(hashMap);
        MemCheckpointStreamFactory memCheckpointStreamFactory = new MemCheckpointStreamFactory(4096);
        StateObject stateObject = null;
        try {
            OperatorStateHandle jobManagerOwnedSnapshot = ((SnapshotResult) FutureUtil.runIfNotDoneAndGet(createOperatorStateBackend.snapshot(0L, 0L, memCheckpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()))).getJobManagerOwnedSnapshot();
            Assert.assertNotNull(jobManagerOwnedSnapshot);
            HashMap hashMap2 = new HashMap();
            createOperatorStateBackend.restore(StateObjectCollection.singleton(jobManagerOwnedSnapshot));
            for (Map.Entry entry : createOperatorStateBackend.getBroadcastState(mapStateDescriptor).entries()) {
                hashMap2.put(entry.getKey(), entry.getValue());
            }
            Assert.assertEquals(hashMap, hashMap2);
            broadcastState.remove(1);
            hashMap.remove(1);
            SnapshotResult snapshotResult = (SnapshotResult) FutureUtil.runIfNotDoneAndGet(createOperatorStateBackend.snapshot(1L, 1L, memCheckpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            jobManagerOwnedSnapshot.discardState();
            OperatorStateHandle jobManagerOwnedSnapshot2 = snapshotResult.getJobManagerOwnedSnapshot();
            hashMap2.clear();
            createOperatorStateBackend.restore(StateObjectCollection.singleton(jobManagerOwnedSnapshot2));
            for (Map.Entry entry2 : createOperatorStateBackend.getBroadcastState(mapStateDescriptor).immutableEntries()) {
                hashMap2.put(entry2.getKey(), entry2.getValue());
            }
            Assert.assertEquals(hashMap, hashMap2);
            broadcastState.clear();
            hashMap.clear();
            SnapshotResult snapshotResult2 = (SnapshotResult) FutureUtil.runIfNotDoneAndGet(createOperatorStateBackend.snapshot(2L, 2L, memCheckpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            if (jobManagerOwnedSnapshot2 != null) {
                jobManagerOwnedSnapshot2.discardState();
            }
            stateObject = (OperatorStateHandle) snapshotResult2.getJobManagerOwnedSnapshot();
            hashMap2.clear();
            createOperatorStateBackend.restore(StateObjectCollection.singleton(stateObject));
            for (Map.Entry entry3 : createOperatorStateBackend.getBroadcastState(mapStateDescriptor).immutableEntries()) {
                hashMap2.put(entry3.getKey(), entry3.getValue());
            }
            Assert.assertTrue(hashMap.isEmpty());
            Assert.assertEquals(hashMap, hashMap2);
            if (stateObject != null) {
                stateObject.discardState();
                stateObject = null;
            }
        } finally {
            createOperatorStateBackend.close();
            createOperatorStateBackend.dispose();
            if (stateObject != null) {
                stateObject.discardState();
            }
        }
    }

    @Test
    public void testSnapshotRestoreSync() throws Exception {
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend(8192);
        OperatorStateBackend createOperatorStateBackend = memoryStateBackend.createOperatorStateBackend(createMockEnvironment(), "test-op-name");
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("test1", new JavaSerializer());
        ListStateDescriptor listStateDescriptor2 = new ListStateDescriptor("test2", new JavaSerializer());
        ListStateDescriptor listStateDescriptor3 = new ListStateDescriptor("test3", new JavaSerializer());
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test4", new JavaSerializer(), new JavaSerializer());
        MapStateDescriptor mapStateDescriptor2 = new MapStateDescriptor("test5", new JavaSerializer(), new JavaSerializer());
        MapStateDescriptor mapStateDescriptor3 = new MapStateDescriptor("test6", new JavaSerializer(), new JavaSerializer());
        ListState listState = createOperatorStateBackend.getListState(listStateDescriptor);
        ListState listState2 = createOperatorStateBackend.getListState(listStateDescriptor2);
        ListState unionListState = createOperatorStateBackend.getUnionListState(listStateDescriptor3);
        BroadcastState broadcastState = createOperatorStateBackend.getBroadcastState(mapStateDescriptor);
        BroadcastState broadcastState2 = createOperatorStateBackend.getBroadcastState(mapStateDescriptor2);
        createOperatorStateBackend.getBroadcastState(mapStateDescriptor3);
        listState.add(42);
        listState.add(4711);
        listState2.add(7);
        listState2.add(13);
        listState2.add(23);
        unionListState.add(17);
        unionListState.add(18);
        unionListState.add(19);
        unionListState.add(20);
        broadcastState.put(1, 2);
        broadcastState.put(2, 5);
        broadcastState2.put(2, 5);
        OperatorStateHandle jobManagerOwnedSnapshot = ((SnapshotResult) FutureUtil.runIfNotDoneAndGet(createOperatorStateBackend.snapshot(1L, 1L, new MemCheckpointStreamFactory(8192), CheckpointOptions.forCheckpointWithDefaultLocation()))).getJobManagerOwnedSnapshot();
        try {
            createOperatorStateBackend.close();
            createOperatorStateBackend.dispose();
            OperatorStateBackend createOperatorStateBackend2 = memoryStateBackend.createOperatorStateBackend(createMockEnvironment(), "testOperator");
            createOperatorStateBackend2.restore(StateObjectCollection.singleton(jobManagerOwnedSnapshot));
            Assert.assertEquals(3L, createOperatorStateBackend2.getRegisteredStateNames().size());
            Assert.assertEquals(3L, createOperatorStateBackend2.getRegisteredBroadcastStateNames().size());
            ListState listState3 = createOperatorStateBackend2.getListState(listStateDescriptor);
            ListState listState4 = createOperatorStateBackend2.getListState(listStateDescriptor2);
            ListState unionListState2 = createOperatorStateBackend2.getUnionListState(listStateDescriptor3);
            BroadcastState broadcastState3 = createOperatorStateBackend2.getBroadcastState(mapStateDescriptor);
            BroadcastState broadcastState4 = createOperatorStateBackend2.getBroadcastState(mapStateDescriptor2);
            BroadcastState broadcastState5 = createOperatorStateBackend2.getBroadcastState(mapStateDescriptor3);
            Assert.assertEquals(3L, createOperatorStateBackend2.getRegisteredStateNames().size());
            Assert.assertEquals(3L, createOperatorStateBackend2.getRegisteredBroadcastStateNames().size());
            Iterator it = ((Iterable) listState3.get()).iterator();
            Assert.assertEquals(42, it.next());
            Assert.assertEquals(4711, it.next());
            Assert.assertFalse(it.hasNext());
            Iterator it2 = ((Iterable) listState4.get()).iterator();
            Assert.assertEquals(7, it2.next());
            Assert.assertEquals(13, it2.next());
            Assert.assertEquals(23, it2.next());
            Assert.assertFalse(it2.hasNext());
            Iterator it3 = ((Iterable) unionListState2.get()).iterator();
            Assert.assertEquals(17, it3.next());
            Assert.assertEquals(18, it3.next());
            Assert.assertEquals(19, it3.next());
            Assert.assertEquals(20, it3.next());
            Assert.assertFalse(it3.hasNext());
            Iterator it4 = broadcastState3.iterator();
            Assert.assertTrue(it4.hasNext());
            Map.Entry entry = (Map.Entry) it4.next();
            Assert.assertEquals(1, entry.getKey());
            Assert.assertEquals(2, entry.getValue());
            Assert.assertTrue(it4.hasNext());
            Map.Entry entry2 = (Map.Entry) it4.next();
            Assert.assertEquals(2, entry2.getKey());
            Assert.assertEquals(5, entry2.getValue());
            Assert.assertFalse(it4.hasNext());
            Iterator it5 = broadcastState4.iterator();
            Assert.assertTrue(it5.hasNext());
            Map.Entry entry3 = (Map.Entry) it5.next();
            Assert.assertEquals(2, entry3.getKey());
            Assert.assertEquals(5, entry3.getValue());
            Assert.assertFalse(it5.hasNext());
            Assert.assertFalse(broadcastState5.iterator().hasNext());
            createOperatorStateBackend2.close();
            createOperatorStateBackend2.dispose();
            jobManagerOwnedSnapshot.discardState();
        } catch (Throwable th) {
            jobManagerOwnedSnapshot.discardState();
            throw th;
        }
    }

    @Test
    public void testSnapshotRestoreAsync() throws Exception {
        DefaultOperatorStateBackend defaultOperatorStateBackend = new DefaultOperatorStateBackend(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true);
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("test1", new JavaSerializer());
        ListStateDescriptor listStateDescriptor2 = new ListStateDescriptor("test2", new JavaSerializer());
        ListStateDescriptor listStateDescriptor3 = new ListStateDescriptor("test3", new JavaSerializer());
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test4", new JavaSerializer(), new JavaSerializer());
        MapStateDescriptor mapStateDescriptor2 = new MapStateDescriptor("test5", new JavaSerializer(), new JavaSerializer());
        MapStateDescriptor mapStateDescriptor3 = new MapStateDescriptor("test6", new JavaSerializer(), new JavaSerializer());
        ListState listState = defaultOperatorStateBackend.getListState(listStateDescriptor);
        ListState listState2 = defaultOperatorStateBackend.getListState(listStateDescriptor2);
        ListState unionListState = defaultOperatorStateBackend.getUnionListState(listStateDescriptor3);
        BroadcastState broadcastState = defaultOperatorStateBackend.getBroadcastState(mapStateDescriptor);
        BroadcastState broadcastState2 = defaultOperatorStateBackend.getBroadcastState(mapStateDescriptor2);
        defaultOperatorStateBackend.getBroadcastState(mapStateDescriptor3);
        listState.add(MutableType.of(42));
        listState.add(MutableType.of(4711));
        listState2.add(MutableType.of(7));
        listState2.add(MutableType.of(13));
        listState2.add(MutableType.of(23));
        unionListState.add(MutableType.of(17));
        unionListState.add(MutableType.of(18));
        unionListState.add(MutableType.of(19));
        unionListState.add(MutableType.of(20));
        broadcastState.put(MutableType.of(1), MutableType.of(2));
        broadcastState.put(MutableType.of(2), MutableType.of(5));
        broadcastState2.put(MutableType.of(2), MutableType.of(5));
        BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = new BlockerCheckpointStreamFactory(1048576);
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        blockerCheckpointStreamFactory.setWaiterLatch(oneShotLatch);
        blockerCheckpointStreamFactory.setBlockerLatch(oneShotLatch2);
        RunnableFuture snapshot = defaultOperatorStateBackend.snapshot(1L, 1L, blockerCheckpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        newFixedThreadPool.submit(snapshot);
        oneShotLatch.await();
        listState.add(MutableType.of(77));
        broadcastState.put(MutableType.of(32), MutableType.of(97));
        int i = 0;
        for (MutableType mutableType : (Iterable) listState2.get()) {
            i++;
            if (i == 2) {
                oneShotLatch2.trigger();
            }
            mutableType.setValue(mutableType.getValue() + 10);
        }
        unionListState.clear();
        broadcastState2.clear();
        defaultOperatorStateBackend.getListState(new ListStateDescriptor("test4", new JavaSerializer()));
        OperatorStateHandle jobManagerOwnedSnapshot = ((SnapshotResult) snapshot.get()).getJobManagerOwnedSnapshot();
        try {
            defaultOperatorStateBackend.close();
            defaultOperatorStateBackend.dispose();
            OperatorStateBackend createOperatorStateBackend = new MemoryStateBackend(4096).createOperatorStateBackend(createMockEnvironment(), "testOperator");
            createOperatorStateBackend.restore(StateObjectCollection.singleton(jobManagerOwnedSnapshot));
            Assert.assertEquals(3L, createOperatorStateBackend.getRegisteredStateNames().size());
            Assert.assertEquals(3L, createOperatorStateBackend.getRegisteredBroadcastStateNames().size());
            ListState listState3 = createOperatorStateBackend.getListState(listStateDescriptor);
            ListState listState4 = createOperatorStateBackend.getListState(listStateDescriptor2);
            ListState unionListState2 = createOperatorStateBackend.getUnionListState(listStateDescriptor3);
            BroadcastState broadcastState3 = createOperatorStateBackend.getBroadcastState(mapStateDescriptor);
            BroadcastState broadcastState4 = createOperatorStateBackend.getBroadcastState(mapStateDescriptor2);
            BroadcastState broadcastState5 = createOperatorStateBackend.getBroadcastState(mapStateDescriptor3);
            Assert.assertEquals(3L, createOperatorStateBackend.getRegisteredStateNames().size());
            Assert.assertEquals(3L, createOperatorStateBackend.getRegisteredBroadcastStateNames().size());
            Iterator it = ((Iterable) listState3.get()).iterator();
            Assert.assertEquals(42L, ((MutableType) it.next()).value);
            Assert.assertEquals(4711L, ((MutableType) it.next()).value);
            Assert.assertFalse(it.hasNext());
            Iterator it2 = ((Iterable) listState4.get()).iterator();
            Assert.assertEquals(7L, ((MutableType) it2.next()).value);
            Assert.assertEquals(13L, ((MutableType) it2.next()).value);
            Assert.assertEquals(23L, ((MutableType) it2.next()).value);
            Assert.assertFalse(it2.hasNext());
            Iterator it3 = ((Iterable) unionListState2.get()).iterator();
            Assert.assertEquals(17L, ((MutableType) it3.next()).value);
            Assert.assertEquals(18L, ((MutableType) it3.next()).value);
            Assert.assertEquals(19L, ((MutableType) it3.next()).value);
            Assert.assertEquals(20L, ((MutableType) it3.next()).value);
            Assert.assertFalse(it3.hasNext());
            Iterator it4 = broadcastState3.iterator();
            Assert.assertTrue(it4.hasNext());
            Map.Entry entry = (Map.Entry) it4.next();
            Assert.assertEquals(1L, ((MutableType) entry.getKey()).value);
            Assert.assertEquals(2L, ((MutableType) entry.getValue()).value);
            Assert.assertTrue(it4.hasNext());
            Map.Entry entry2 = (Map.Entry) it4.next();
            Assert.assertEquals(2L, ((MutableType) entry2.getKey()).value);
            Assert.assertEquals(5L, ((MutableType) entry2.getValue()).value);
            Assert.assertFalse(it4.hasNext());
            Iterator it5 = broadcastState4.iterator();
            Assert.assertTrue(it5.hasNext());
            Map.Entry entry3 = (Map.Entry) it5.next();
            Assert.assertEquals(2L, ((MutableType) entry3.getKey()).value);
            Assert.assertEquals(5L, ((MutableType) entry3.getValue()).value);
            Assert.assertFalse(it5.hasNext());
            Assert.assertFalse(broadcastState5.iterator().hasNext());
            createOperatorStateBackend.close();
            createOperatorStateBackend.dispose();
            jobManagerOwnedSnapshot.discardState();
            newFixedThreadPool.shutdown();
        } catch (Throwable th) {
            jobManagerOwnedSnapshot.discardState();
            throw th;
        }
    }

    @Test
    public void testSnapshotAsyncClose() throws Exception {
        DefaultOperatorStateBackend defaultOperatorStateBackend = new DefaultOperatorStateBackend(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true);
        ListState operatorState = defaultOperatorStateBackend.getOperatorState(new ListStateDescriptor("test1", new JavaSerializer()));
        operatorState.add(MutableType.of(42));
        operatorState.add(MutableType.of(4711));
        BroadcastState broadcastState = defaultOperatorStateBackend.getBroadcastState(new MapStateDescriptor("test4", new JavaSerializer(), new JavaSerializer()));
        broadcastState.put(MutableType.of(1), MutableType.of(2));
        broadcastState.put(MutableType.of(2), MutableType.of(5));
        BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = new BlockerCheckpointStreamFactory(1048576);
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        blockerCheckpointStreamFactory.setWaiterLatch(oneShotLatch);
        blockerCheckpointStreamFactory.setBlockerLatch(oneShotLatch2);
        RunnableFuture snapshot = defaultOperatorStateBackend.snapshot(1L, 1L, blockerCheckpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        Executors.newFixedThreadPool(1).submit(snapshot);
        oneShotLatch.await();
        defaultOperatorStateBackend.close();
        oneShotLatch2.trigger();
        try {
            snapshot.get(60L, TimeUnit.SECONDS);
            Assert.fail();
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof IOException);
        }
    }

    @Test
    public void testSnapshotAsyncCancel() throws Exception {
        DefaultOperatorStateBackend defaultOperatorStateBackend = new DefaultOperatorStateBackend(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true);
        ListState operatorState = defaultOperatorStateBackend.getOperatorState(new ListStateDescriptor("test1", new JavaSerializer()));
        operatorState.add(MutableType.of(42));
        operatorState.add(MutableType.of(4711));
        BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = new BlockerCheckpointStreamFactory(1048576);
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        blockerCheckpointStreamFactory.setWaiterLatch(oneShotLatch);
        blockerCheckpointStreamFactory.setBlockerLatch(oneShotLatch2);
        RunnableFuture snapshot = defaultOperatorStateBackend.snapshot(1L, 1L, blockerCheckpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        Executors.newFixedThreadPool(1).submit(snapshot);
        oneShotLatch.await();
        snapshot.cancel(true);
        Iterator<BlockingCheckpointOutputStream> it = blockerCheckpointStreamFactory.getAllCreatedStreams().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(it.next().isClosed());
        }
        oneShotLatch2.trigger();
        try {
            snapshot.get(60L, TimeUnit.SECONDS);
            Assert.fail();
        } catch (CancellationException e) {
        }
    }

    @Test
    public void testRestoreFailsIfSerializerDeserializationFails() throws Exception {
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend(4096);
        OperatorStateBackend createOperatorStateBackend = memoryStateBackend.createOperatorStateBackend(createMockEnvironment(), "test-op-name");
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("test1", new JavaSerializer());
        ListStateDescriptor listStateDescriptor2 = new ListStateDescriptor("test2", new JavaSerializer());
        ListStateDescriptor listStateDescriptor3 = new ListStateDescriptor("test3", new JavaSerializer());
        ListState listState = createOperatorStateBackend.getListState(listStateDescriptor);
        ListState listState2 = createOperatorStateBackend.getListState(listStateDescriptor2);
        ListState unionListState = createOperatorStateBackend.getUnionListState(listStateDescriptor3);
        listState.add(42);
        listState.add(4711);
        listState2.add(7);
        listState2.add(13);
        listState2.add(23);
        unionListState.add(17);
        unionListState.add(18);
        unionListState.add(19);
        unionListState.add(20);
        OperatorStateHandle jobManagerOwnedSnapshot = ((SnapshotResult) FutureUtil.runIfNotDoneAndGet(createOperatorStateBackend.snapshot(1L, 1L, new MemCheckpointStreamFactory(4096), CheckpointOptions.forCheckpointWithDefaultLocation()))).getJobManagerOwnedSnapshot();
        try {
            try {
                createOperatorStateBackend.close();
                createOperatorStateBackend.dispose();
                memoryStateBackend.createOperatorStateBackend(new DummyEnvironment(new ArtificialCNFExceptionThrowingClassLoader(getClass().getClassLoader(), Collections.singleton(JavaSerializer.class.getName()))), "testOperator").restore(StateObjectCollection.singleton(jobManagerOwnedSnapshot));
                Assert.fail("The operator state restore should have failed if the previous state serializer could not be loaded.");
                jobManagerOwnedSnapshot.discardState();
            } catch (IOException e) {
                Assert.assertTrue(e.getMessage().contains("Unable to restore operator state"));
                jobManagerOwnedSnapshot.discardState();
            }
        } catch (Throwable th) {
            jobManagerOwnedSnapshot.discardState();
            throw th;
        }
    }

    private static Environment createMockEnvironment() {
        Environment environment = (Environment) Mockito.mock(Environment.class);
        Mockito.when(environment.getExecutionConfig()).thenReturn(new ExecutionConfig());
        Mockito.when(environment.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader());
        return environment;
    }
}
