package org.apache.beam.runners.flink.streaming;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.beam.runners.core.StateMerging;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaceForTest;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.class */
public class FlinkKeyGroupStateInternalsTest {
    private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
    private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
    private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
    private static final StateTag<BagState<String>> STRING_BAG_ADDR = StateTags.bag("stringBag", StringUtf8Coder.of());
    FlinkKeyGroupStateInternals<String> underTest;
    private KeyedStateBackend keyedStateBackend;

    /* loaded from: input_file:org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest$TestKeyContext.class */
    private static class TestKeyContext implements KeyContext {
        private Object key;

        private TestKeyContext() {
        }

        public void setCurrentKey(Object obj) {
            this.key = obj;
        }

        public Object getCurrentKey() {
            return this.key;
        }
    }

    @Before
    public void initStateInternals() {
        try {
            this.keyedStateBackend = getKeyedStateBackend(2, new KeyGroupRange(0, 1));
            this.underTest = new FlinkKeyGroupStateInternals<>(StringUtf8Coder.of(), this.keyedStateBackend);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private KeyedStateBackend getKeyedStateBackend(int i, KeyGroupRange keyGroupRange) {
        try {
            AbstractKeyedStateBackend createKeyedStateBackend = new MemoryStateBackend().createKeyedStateBackend(new DummyEnvironment("test", 1, 0), new JobID(), "test_op", new GenericTypeInfo(ByteBuffer.class).createSerializer(new ExecutionConfig()), i, keyGroupRange, new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
            createKeyedStateBackend.setCurrentKey(ByteBuffer.wrap(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "1")));
            return createKeyedStateBackend;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testBag() throws Exception {
        BagState state = this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
        Assert.assertEquals(state, this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
        Assert.assertFalse(state.equals(this.underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
        Assert.assertThat(state.read(), Matchers.emptyIterable());
        state.add("hello");
        Assert.assertThat(state.read(), Matchers.containsInAnyOrder(new String[]{"hello"}));
        state.add("world");
        Assert.assertThat(state.read(), Matchers.containsInAnyOrder(new String[]{"hello", "world"}));
        state.clear();
        Assert.assertThat(state.read(), Matchers.emptyIterable());
        Assert.assertEquals(this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR), state);
    }

    @Test
    public void testBagIsEmpty() throws Exception {
        BagState state = this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
        Assert.assertThat(state.isEmpty().read(), Matchers.is(true));
        ReadableState isEmpty = state.isEmpty();
        state.add("hello");
        Assert.assertThat(isEmpty.read(), Matchers.is(false));
        state.clear();
        Assert.assertThat(isEmpty.read(), Matchers.is(true));
    }

    @Test
    public void testMergeBagIntoSource() throws Exception {
        BagState state = this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
        BagState state2 = this.underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
        state.add("Hello");
        state2.add("World");
        state.add("!");
        StateMerging.mergeBags(Arrays.asList(state, state2), state);
        Assert.assertThat(state.read(), Matchers.containsInAnyOrder(new String[]{"Hello", "World", "!"}));
        Assert.assertThat(state2.read(), Matchers.emptyIterable());
    }

    @Test
    public void testMergeBagIntoNewNamespace() throws Exception {
        BagState state = this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
        BagState state2 = this.underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
        BagState state3 = this.underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
        state.add("Hello");
        state2.add("World");
        state.add("!");
        StateMerging.mergeBags(Arrays.asList(state, state2, state3), state3);
        Assert.assertThat(state3.read(), Matchers.containsInAnyOrder(new String[]{"Hello", "World", "!"}));
        Assert.assertThat(state.read(), Matchers.emptyIterable());
        Assert.assertThat(state2.read(), Matchers.emptyIterable());
    }

    @Test
    public void testKeyGroupAndCheckpoint() throws Exception {
        ByteBuffer wrap = ByteBuffer.wrap(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "11111111"));
        ByteBuffer wrap2 = ByteBuffer.wrap(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "22222222"));
        KeyedStateBackend keyedStateBackend = getKeyedStateBackend(2, new KeyGroupRange(0, 1));
        FlinkKeyGroupStateInternals flinkKeyGroupStateInternals = new FlinkKeyGroupStateInternals(StringUtf8Coder.of(), keyedStateBackend);
        BagState state = flinkKeyGroupStateInternals.state(NAMESPACE_1, STRING_BAG_ADDR);
        BagState state2 = flinkKeyGroupStateInternals.state(NAMESPACE_2, STRING_BAG_ADDR);
        keyedStateBackend.setCurrentKey(wrap);
        state.add("0");
        state2.add("2");
        keyedStateBackend.setCurrentKey(wrap2);
        state.add("1");
        state2.add("3");
        Assert.assertThat(state.read(), Matchers.containsInAnyOrder(new String[]{"0", "1"}));
        Assert.assertThat(state2.read(), Matchers.containsInAnyOrder(new String[]{"2", "3"}));
        ClassLoader classLoader = FlinkKeyGroupStateInternalsTest.class.getClassLoader();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        flinkKeyGroupStateInternals.snapshotKeyGroupState(0, new DataOutputStream(byteArrayOutputStream));
        DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
        FlinkKeyGroupStateInternals flinkKeyGroupStateInternals2 = new FlinkKeyGroupStateInternals(StringUtf8Coder.of(), getKeyedStateBackend(2, new KeyGroupRange(0, 0)));
        flinkKeyGroupStateInternals2.restoreKeyGroupState(0, dataInputStream, classLoader);
        BagState state3 = flinkKeyGroupStateInternals2.state(NAMESPACE_1, STRING_BAG_ADDR);
        BagState state4 = flinkKeyGroupStateInternals2.state(NAMESPACE_2, STRING_BAG_ADDR);
        Assert.assertThat(state3.read(), Matchers.containsInAnyOrder(new String[]{"0"}));
        Assert.assertThat(state4.read(), Matchers.containsInAnyOrder(new String[]{"2"}));
        ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
        flinkKeyGroupStateInternals.snapshotKeyGroupState(1, new DataOutputStream(byteArrayOutputStream2));
        DataInputStream dataInputStream2 = new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream2.toByteArray()));
        FlinkKeyGroupStateInternals flinkKeyGroupStateInternals3 = new FlinkKeyGroupStateInternals(StringUtf8Coder.of(), getKeyedStateBackend(2, new KeyGroupRange(1, 1)));
        flinkKeyGroupStateInternals3.restoreKeyGroupState(1, dataInputStream2, classLoader);
        BagState state5 = flinkKeyGroupStateInternals3.state(NAMESPACE_1, STRING_BAG_ADDR);
        BagState state6 = flinkKeyGroupStateInternals3.state(NAMESPACE_2, STRING_BAG_ADDR);
        Assert.assertThat(state5.read(), Matchers.containsInAnyOrder(new String[]{"1"}));
        Assert.assertThat(state6.read(), Matchers.containsInAnyOrder(new String[]{"3"}));
        FlinkKeyGroupStateInternals flinkKeyGroupStateInternals4 = new FlinkKeyGroupStateInternals(StringUtf8Coder.of(), getKeyedStateBackend(2, new KeyGroupRange(0, 1)));
        dataInputStream.reset();
        dataInputStream2.reset();
        flinkKeyGroupStateInternals4.restoreKeyGroupState(0, dataInputStream, classLoader);
        flinkKeyGroupStateInternals4.restoreKeyGroupState(1, dataInputStream2, classLoader);
        BagState state7 = flinkKeyGroupStateInternals4.state(NAMESPACE_1, STRING_BAG_ADDR);
        BagState state8 = flinkKeyGroupStateInternals4.state(NAMESPACE_2, STRING_BAG_ADDR);
        Assert.assertThat(state7.read(), Matchers.containsInAnyOrder(new String[]{"0", "1"}));
        Assert.assertThat(state8.read(), Matchers.containsInAnyOrder(new String[]{"2", "3"}));
    }
}
