package org.apache.beam.runners.flink.streaming;

import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaceForTest;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.class */
public class FlinkSplitStateInternalsTest {
    private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
    private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
    private static final StateTag<BagState<String>> STRING_BAG_ADDR = StateTags.bag("stringBag", StringUtf8Coder.of());
    FlinkSplitStateInternals<String> underTest;

    @Before
    public void initStateInternals() {
        try {
            this.underTest = new FlinkSplitStateInternals<>(new MemoryStateBackend().createOperatorStateBackend(new DummyEnvironment("test", 1, 0), ""));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testBag() throws Exception {
        BagState state = this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
        Assert.assertEquals(state, this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
        Assert.assertFalse(state.equals(this.underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
        Assert.assertThat(state.read(), Matchers.emptyIterable());
        state.add("hello");
        Assert.assertThat(state.read(), Matchers.containsInAnyOrder(new String[]{"hello"}));
        state.add("world");
        Assert.assertThat(state.read(), Matchers.containsInAnyOrder(new String[]{"hello", "world"}));
        state.clear();
        Assert.assertThat(state.read(), Matchers.emptyIterable());
        Assert.assertEquals(this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR), state);
    }

    @Test
    public void testBagIsEmpty() throws Exception {
        BagState state = this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
        Assert.assertThat(state.isEmpty().read(), Matchers.is(true));
        ReadableState isEmpty = state.isEmpty();
        state.add("hello");
        Assert.assertThat(isEmpty.read(), Matchers.is(false));
        state.clear();
        Assert.assertThat(isEmpty.read(), Matchers.is(true));
    }
}
