package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.testutils.statemigration.TestType;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.class */
public class CopyOnWriteStateTableTest {
    @Test
    public void testSerializerAfterMetaInfoChanged() {
        CopyOnWriteStateTable copyOnWriteStateTable = new CopyOnWriteStateTable(new InternalKeyContextImpl(KeyGroupRange.of(0, 9), 10), new RegisteredKeyValueStateBackendMetaInfo(StateDescriptor.Type.VALUE, "test", IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()), IntSerializer.INSTANCE);
        copyOnWriteStateTable.setMetaInfo(new RegisteredKeyValueStateBackendMetaInfo(StateDescriptor.Type.VALUE, "test", IntSerializer.INSTANCE, new TestType.V2TestTypeSerializer()));
        Preconditions.checkState(copyOnWriteStateTable.getState().length > 0);
        for (CopyOnWriteStateMap copyOnWriteStateMap : copyOnWriteStateTable.getState()) {
            Assert.assertEquals(copyOnWriteStateTable.getStateSerializer(), copyOnWriteStateMap.getStateSerializer());
        }
    }

    @Test
    public void testSerializerDuplicationInSnapshot() throws IOException {
        TestDuplicateSerializer testDuplicateSerializer = new TestDuplicateSerializer();
        TestDuplicateSerializer testDuplicateSerializer2 = new TestDuplicateSerializer();
        TestDuplicateSerializer testDuplicateSerializer3 = new TestDuplicateSerializer();
        CopyOnWriteStateTable copyOnWriteStateTable = new CopyOnWriteStateTable(new MockInternalKeyContext(), new RegisteredKeyValueStateBackendMetaInfo(StateDescriptor.Type.VALUE, "test", testDuplicateSerializer, testDuplicateSerializer2), testDuplicateSerializer3);
        copyOnWriteStateTable.put(0, 0, 0, 0);
        copyOnWriteStateTable.put(1, 0, 0, 1);
        copyOnWriteStateTable.put(2, 0, 1, 2);
        StateSnapshot.StateKeyGroupWriter keyGroupWriter = copyOnWriteStateTable.stateSnapshot().getKeyGroupWriter();
        testDuplicateSerializer.disable();
        testDuplicateSerializer3.disable();
        testDuplicateSerializer2.disable();
        keyGroupWriter.writeStateInKeyGroup(new DataOutputViewStreamWrapper(new ByteArrayOutputStreamWithPos(StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE)), 0);
    }

    @Test
    public void testReleaseForSuccessfulSnapshot() throws IOException {
        CopyOnWriteStateTable<Integer, Integer, Float> createStateTableForSnapshotRelease = createStateTableForSnapshotRelease(10);
        DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(new ByteArrayOutputStreamWithPos());
        CopyOnWriteStateTableSnapshot stateSnapshot = createStateTableForSnapshotRelease.stateSnapshot();
        for (int i = 0; i < 10; i++) {
            stateSnapshot.writeStateInKeyGroup(dataOutputViewStreamWrapper, i);
            Assert.assertTrue(isResourceReleasedForKeyGroup(createStateTableForSnapshotRelease, i));
        }
        stateSnapshot.release();
        verifyResourceIsReleasedForAllKeyGroup(createStateTableForSnapshotRelease, 1);
    }

    @Test
    public void testReleaseForFailedSnapshot() throws IOException {
        CopyOnWriteStateTable<Integer, Integer, Float> createStateTableForSnapshotRelease = createStateTableForSnapshotRelease(10);
        DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(new ByteArrayOutputStreamWithPos());
        CopyOnWriteStateTableSnapshot stateSnapshot = createStateTableForSnapshotRelease.stateSnapshot();
        for (int i = 0; i < 10 / 2; i++) {
            stateSnapshot.writeStateInKeyGroup(dataOutputViewStreamWrapper, i);
            Assert.assertTrue(isResourceReleasedForKeyGroup(createStateTableForSnapshotRelease, i));
        }
        for (int i2 = 10 / 2; i2 < 10; i2++) {
            Assert.assertFalse(isResourceReleasedForKeyGroup(createStateTableForSnapshotRelease, i2));
        }
        stateSnapshot.release();
        verifyResourceIsReleasedForAllKeyGroup(createStateTableForSnapshotRelease, 2);
    }

    private CopyOnWriteStateTable<Integer, Integer, Float> createStateTableForSnapshotRelease(int i) {
        RegisteredKeyValueStateBackendMetaInfo registeredKeyValueStateBackendMetaInfo = new RegisteredKeyValueStateBackendMetaInfo(StateDescriptor.Type.VALUE, "test", IntSerializer.INSTANCE, FloatSerializer.INSTANCE);
        MockInternalKeyContext mockInternalKeyContext = new MockInternalKeyContext(0, i - 1, i);
        CopyOnWriteStateTable<Integer, Integer, Float> copyOnWriteStateTable = new CopyOnWriteStateTable<>(mockInternalKeyContext, registeredKeyValueStateBackendMetaInfo, IntSerializer.INSTANCE);
        ThreadLocalRandom current = ThreadLocalRandom.current();
        for (int i2 = 0; i2 < 1000; i2++) {
            mockInternalKeyContext.setCurrentKeyAndKeyGroup(Integer.valueOf(i2));
            copyOnWriteStateTable.put(Integer.valueOf(current.nextInt()), Float.valueOf(current.nextFloat()));
        }
        return copyOnWriteStateTable;
    }

    private void verifyResourceIsReleasedForAllKeyGroup(CopyOnWriteStateTable copyOnWriteStateTable, int i) {
        for (CopyOnWriteStateMap copyOnWriteStateMap : copyOnWriteStateTable.getState()) {
            Assert.assertFalse(copyOnWriteStateMap.getSnapshotVersions().contains(Integer.valueOf(i)));
        }
    }

    private boolean isResourceReleasedForKeyGroup(CopyOnWriteStateTable copyOnWriteStateTable, int i) {
        return !copyOnWriteStateTable.getMapForKeyGroup(i).getSnapshotVersions().contains(1);
    }
}
