package org.apache.beam.repackaged.direct_java.runners.core;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.GroupingState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/StateInternalsTest.class */
public abstract class StateInternalsTest {
    private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
    private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
    private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
    private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = StateTags.value("stringValue", StringUtf8Coder.of());
    private static final StateTag<CombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal("sumInteger", VarIntCoder.of(), Sum.ofIntegers());
    private static final StateTag<CombiningState<Integer, Integer, Integer>> SUM_INTEGER_CONTEXT_ADDR = StateTags.combiningValueWithContext("sumIntegerWithContext", VarIntCoder.of(), new SummingContextFn());
    private static final StateTag<BagState<String>> STRING_BAG_ADDR = StateTags.bag("stringBag", StringUtf8Coder.of());
    private static final StateTag<SetState<String>> STRING_SET_ADDR = StateTags.set("stringSet", StringUtf8Coder.of());
    private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR = StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
    private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR = StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
    private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR = StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
    private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR = StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
    private static final StateTag<BagState<String>> STRING_BAG_ADDR1 = StateTags.bag("badStringBag", new StringCoderWithIdentityEquality());
    private static final StateTag<BagState<String>> STRING_BAG_ADDR2 = StateTags.bag("badStringBag", new StringCoderWithIdentityEquality());
    private StateInternals underTest;

    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/StateInternalsTest$MapEntry.class */
    private static class MapEntry<K, V> implements Map.Entry<K, V> {
        private K key;
        private V value;

        private MapEntry(K k, V v) {
            this.key = k;
            this.value = v;
        }

        static <K, V> Map.Entry<K, V> of(K k, V v) {
            return new MapEntry(k, v);
        }

        @Override // java.util.Map.Entry
        public final K getKey() {
            return this.key;
        }

        @Override // java.util.Map.Entry
        public final V getValue() {
            return this.value;
        }

        public final String toString() {
            return this.key + "=" + this.value;
        }

        @Override // java.util.Map.Entry
        public final int hashCode() {
            return Objects.hashCode(this.key) ^ Objects.hashCode(this.value);
        }

        @Override // java.util.Map.Entry
        public final V setValue(V v) {
            V v2 = this.value;
            this.value = v;
            return v2;
        }

        @Override // java.util.Map.Entry
        public final boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Map.Entry)) {
                return false;
            }
            Map.Entry entry = (Map.Entry) obj;
            return Objects.equals(this.key, entry.getKey()) && Objects.equals(this.value, entry.getValue());
        }
    }

    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/StateInternalsTest$StringCoderWithIdentityEquality.class */
    private static class StringCoderWithIdentityEquality extends Coder<String> {
        private final StringUtf8Coder realCoder;

        private StringCoderWithIdentityEquality() {
            this.realCoder = StringUtf8Coder.of();
        }

        public void encode(String str, OutputStream outputStream) throws CoderException, IOException {
            this.realCoder.encode(str, outputStream);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public String m68decode(InputStream inputStream) throws CoderException, IOException {
            return this.realCoder.decode(inputStream);
        }

        public List<? extends Coder<?>> getCoderArguments() {
            return null;
        }

        public void verifyDeterministic() throws Coder.NonDeterministicException {
        }

        public boolean equals(Object obj) {
            return obj == this;
        }

        public int hashCode() {
            return StateInternalsTest.super.hashCode();
        }
    }

    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/StateInternalsTest$SummingContextFn.class */
    private static class SummingContextFn extends CombineWithContext.CombineFnWithContext<Integer, Integer, Integer> {
        private SummingContextFn() {
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Integer m70createAccumulator(CombineWithContext.Context context) {
            return 0;
        }

        public Integer addInput(Integer num, Integer num2, CombineWithContext.Context context) {
            return Integer.valueOf(num.intValue() + num2.intValue());
        }

        public Integer mergeAccumulators(Iterable<Integer> iterable, CombineWithContext.Context context) {
            int intValue = m70createAccumulator(context).intValue();
            Iterator<Integer> it = iterable.iterator();
            while (it.hasNext()) {
                intValue += it.next().intValue();
            }
            return Integer.valueOf(intValue);
        }

        public Integer extractOutput(Integer num, CombineWithContext.Context context) {
            return num;
        }

        /* renamed from: mergeAccumulators, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m69mergeAccumulators(Iterable iterable, CombineWithContext.Context context) {
            return mergeAccumulators((Iterable<Integer>) iterable, context);
        }
    }

    @Before
    public void setUp() {
        this.underTest = createStateInternals();
    }

    protected abstract StateInternals createStateInternals();

    @Test
    public void testValue() throws Exception {
        ValueState state = this.underTest.state(NAMESPACE_1, STRING_VALUE_ADDR);
        MatcherAssert.assertThat(this.underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), Matchers.equalTo(state));
        MatcherAssert.assertThat(this.underTest.state(NAMESPACE_2, STRING_VALUE_ADDR), Matchers.not(Matchers.equalTo(state)));
        MatcherAssert.assertThat((String) state.read(), Matchers.nullValue());
        state.write("hello");
        MatcherAssert.assertThat((String) state.read(), Matchers.equalTo("hello"));
        state.write("world");
        MatcherAssert.assertThat((String) state.read(), Matchers.equalTo("world"));
        state.clear();
        MatcherAssert.assertThat((String) state.read(), Matchers.nullValue());
        MatcherAssert.assertThat(this.underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), Matchers.equalTo(state));
    }

    @Test
    public void testBag() throws Exception {
        BagState state = this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
        MatcherAssert.assertThat(state, Matchers.equalTo(this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR)));
        MatcherAssert.assertThat(state, Matchers.not(Matchers.equalTo(this.underTest.state(NAMESPACE_2, STRING_BAG_ADDR))));
        MatcherAssert.assertThat(state.read(), Matchers.emptyIterable());
        state.add("hello");
        MatcherAssert.assertThat(state.read(), Matchers.containsInAnyOrder(new String[]{"hello"}));
        state.add("world");
        MatcherAssert.assertThat(state.read(), Matchers.containsInAnyOrder(new String[]{"hello", "world"}));
        state.clear();
        MatcherAssert.assertThat(state.read(), Matchers.emptyIterable());
        MatcherAssert.assertThat(this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR), Matchers.equalTo(state));
    }

    @Test
    public void testBagIsEmpty() throws Exception {
        BagState state = this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
        MatcherAssert.assertThat((Boolean) state.isEmpty().read(), Matchers.is(true));
        ReadableState isEmpty = state.isEmpty();
        state.add("hello");
        MatcherAssert.assertThat((Boolean) isEmpty.read(), Matchers.is(false));
        state.clear();
        MatcherAssert.assertThat((Boolean) isEmpty.read(), Matchers.is(true));
    }

    @Test
    public void testMergeBagIntoSource() throws Exception {
        BagState state = this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
        BagState state2 = this.underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
        state.add("Hello");
        state2.add("World");
        state.add("!");
        StateMerging.mergeBags(Arrays.asList(state, state2), state);
        MatcherAssert.assertThat(state.read(), Matchers.containsInAnyOrder(new String[]{"Hello", "World", "!"}));
        MatcherAssert.assertThat(state2.read(), Matchers.emptyIterable());
    }

    @Test
    public void testMergeBagIntoNewNamespace() throws Exception {
        BagState state = this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
        BagState state2 = this.underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
        BagState state3 = this.underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
        state.add("Hello");
        state2.add("World");
        state.add("!");
        StateMerging.mergeBags(Arrays.asList(state, state2, state3), state3);
        MatcherAssert.assertThat(state3.read(), Matchers.containsInAnyOrder(new String[]{"Hello", "World", "!"}));
        MatcherAssert.assertThat(state.read(), Matchers.emptyIterable());
        MatcherAssert.assertThat(state2.read(), Matchers.emptyIterable());
    }

    @Test
    public void testSet() throws Exception {
        SetState state = this.underTest.state(NAMESPACE_1, STRING_SET_ADDR);
        MatcherAssert.assertThat(state, Matchers.equalTo(this.underTest.state(NAMESPACE_1, STRING_SET_ADDR)));
        MatcherAssert.assertThat(state, Matchers.not(Matchers.equalTo(this.underTest.state(NAMESPACE_2, STRING_SET_ADDR))));
        MatcherAssert.assertThat((Iterable) state.read(), Matchers.emptyIterable());
        Assert.assertFalse(((Boolean) state.contains("A").read()).booleanValue());
        state.add("A");
        state.add("B");
        state.add("A");
        Assert.assertFalse(((Boolean) state.addIfAbsent("B").read()).booleanValue());
        MatcherAssert.assertThat((Iterable) state.read(), Matchers.containsInAnyOrder(new String[]{"A", "B"}));
        state.remove("A");
        MatcherAssert.assertThat((Iterable) state.read(), Matchers.containsInAnyOrder(new String[]{"B"}));
        state.remove("C");
        MatcherAssert.assertThat((Iterable) state.read(), Matchers.containsInAnyOrder(new String[]{"B"}));
        Assert.assertFalse(((Boolean) state.contains("A").read()).booleanValue());
        Assert.assertTrue(((Boolean) state.contains("B").read()).booleanValue());
        state.add("C");
        state.add("D");
        MatcherAssert.assertThat((Iterable) state.readLater().read(), Matchers.containsInAnyOrder(new String[]{"B", "C", "D"}));
        SetState readLater = state.readLater();
        MatcherAssert.assertThat((Iterable) readLater.read(), Matchers.hasItems(new String[]{"C", "D"}));
        Assert.assertFalse(((Boolean) readLater.contains("A").read()).booleanValue());
        state.clear();
        MatcherAssert.assertThat((Iterable) state.read(), Matchers.emptyIterable());
        MatcherAssert.assertThat(this.underTest.state(NAMESPACE_1, STRING_SET_ADDR), Matchers.equalTo(state));
    }

    @Test
    public void testSetIsEmpty() throws Exception {
        SetState state = this.underTest.state(NAMESPACE_1, STRING_SET_ADDR);
        MatcherAssert.assertThat((Boolean) state.isEmpty().read(), Matchers.is(true));
        ReadableState isEmpty = state.isEmpty();
        state.add("hello");
        MatcherAssert.assertThat((Boolean) isEmpty.read(), Matchers.is(false));
        state.clear();
        MatcherAssert.assertThat((Boolean) isEmpty.read(), Matchers.is(true));
    }

    @Test
    public void testMergeSetIntoSource() throws Exception {
        SetState state = this.underTest.state(NAMESPACE_1, STRING_SET_ADDR);
        SetState state2 = this.underTest.state(NAMESPACE_2, STRING_SET_ADDR);
        state.add("Hello");
        state2.add("Hello");
        state2.add("World");
        state.add("!");
        StateMerging.mergeSets(Arrays.asList(state, state2), state);
        MatcherAssert.assertThat((Iterable) state.read(), Matchers.containsInAnyOrder(new String[]{"Hello", "World", "!"}));
        MatcherAssert.assertThat((Iterable) state2.read(), Matchers.emptyIterable());
    }

    @Test
    public void testMergeSetIntoNewNamespace() throws Exception {
        SetState state = this.underTest.state(NAMESPACE_1, STRING_SET_ADDR);
        SetState state2 = this.underTest.state(NAMESPACE_2, STRING_SET_ADDR);
        SetState state3 = this.underTest.state(NAMESPACE_3, STRING_SET_ADDR);
        state.add("Hello");
        state2.add("Hello");
        state2.add("World");
        state.add("!");
        StateMerging.mergeSets(Arrays.asList(state, state2, state3), state3);
        MatcherAssert.assertThat((Iterable) state3.read(), Matchers.containsInAnyOrder(new String[]{"Hello", "World", "!"}));
        MatcherAssert.assertThat((Iterable) state.read(), Matchers.emptyIterable());
        MatcherAssert.assertThat((Iterable) state2.read(), Matchers.emptyIterable());
    }

    @Test
    public void testMap() throws Exception {
        MapState state = this.underTest.state(NAMESPACE_1, STRING_MAP_ADDR);
        MatcherAssert.assertThat(state, Matchers.equalTo(this.underTest.state(NAMESPACE_1, STRING_MAP_ADDR)));
        MatcherAssert.assertThat(state, Matchers.not(Matchers.equalTo(this.underTest.state(NAMESPACE_2, STRING_MAP_ADDR))));
        MatcherAssert.assertThat((Iterable) state.entries().read(), Matchers.emptyIterable());
        state.put("A", 1);
        state.put("B", 2);
        state.put("A", 11);
        MatcherAssert.assertThat((Integer) state.putIfAbsent("B", 22).read(), Matchers.equalTo(2));
        MatcherAssert.assertThat((Iterable) state.entries().read(), Matchers.containsInAnyOrder(new Map.Entry[]{MapEntry.of("A", 11), MapEntry.of("B", 2)}));
        state.remove("A");
        MatcherAssert.assertThat((Iterable) state.entries().read(), Matchers.containsInAnyOrder(new Map.Entry[]{MapEntry.of("B", 2)}));
        state.remove("C");
        MatcherAssert.assertThat((Iterable) state.entries().read(), Matchers.containsInAnyOrder(new Map.Entry[]{MapEntry.of("B", 2)}));
        Assert.assertNull(state.get("A").read());
        MatcherAssert.assertThat((Integer) state.get("B").read(), Matchers.equalTo(2));
        state.put("C", 3);
        state.put("D", 4);
        MatcherAssert.assertThat((Integer) state.get("C").read(), Matchers.equalTo(3));
        state.put("E", 5);
        state.remove("C");
        MatcherAssert.assertThat((Iterable) state.keys().read(), Matchers.containsInAnyOrder(new String[]{"B", "D", "E"}));
        MatcherAssert.assertThat((Iterable) state.values().read(), Matchers.containsInAnyOrder(new Integer[]{2, 4, 5}));
        MatcherAssert.assertThat((Iterable) state.entries().read(), Matchers.containsInAnyOrder(new Map.Entry[]{MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)}));
        MatcherAssert.assertThat((Integer) state.get("B").readLater().read(), Matchers.equalTo(2));
        Assert.assertNull(state.get("A").readLater().read());
        MatcherAssert.assertThat((Iterable) state.entries().readLater().read(), Matchers.containsInAnyOrder(new Map.Entry[]{MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)}));
        state.clear();
        MatcherAssert.assertThat((Iterable) state.entries().read(), Matchers.emptyIterable());
        MatcherAssert.assertThat(this.underTest.state(NAMESPACE_1, STRING_MAP_ADDR), Matchers.equalTo(state));
    }

    @Test
    public void testCombiningValue() throws Exception {
        GroupingState state = this.underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
        Assert.assertEquals(state, this.underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
        Assert.assertFalse(state.equals(this.underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
        MatcherAssert.assertThat((Integer) state.read(), Matchers.equalTo(0));
        state.add(2);
        MatcherAssert.assertThat((Integer) state.read(), Matchers.equalTo(2));
        state.add(3);
        MatcherAssert.assertThat((Integer) state.read(), Matchers.equalTo(5));
        state.clear();
        MatcherAssert.assertThat((Integer) state.read(), Matchers.equalTo(0));
        MatcherAssert.assertThat(this.underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), Matchers.equalTo(state));
    }

    @Test
    public void testCombiningIsEmpty() throws Exception {
        GroupingState state = this.underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
        MatcherAssert.assertThat((Boolean) state.isEmpty().read(), Matchers.is(true));
        ReadableState isEmpty = state.isEmpty();
        state.add(5);
        MatcherAssert.assertThat((Boolean) isEmpty.read(), Matchers.is(false));
        state.clear();
        MatcherAssert.assertThat((Boolean) isEmpty.read(), Matchers.is(true));
    }

    @Test
    public void testMergeCombiningValueIntoSource() throws Exception {
        CombiningState state = this.underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
        CombiningState state2 = this.underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
        MatcherAssert.assertThat((int[]) state.getAccum(), Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat((int[]) state2.getAccum(), Matchers.is(Matchers.notNullValue()));
        state.add(5);
        state2.add(10);
        state.add(6);
        MatcherAssert.assertThat((Integer) state.read(), Matchers.equalTo(11));
        MatcherAssert.assertThat((Integer) state2.read(), Matchers.equalTo(10));
        StateMerging.mergeCombiningValues(Arrays.asList(state, state2), state);
        MatcherAssert.assertThat((Integer) state.read(), Matchers.equalTo(21));
        MatcherAssert.assertThat((Integer) state2.read(), Matchers.equalTo(0));
    }

    @Test
    public void testMergeCombiningValueIntoNewNamespace() throws Exception {
        CombiningState state = this.underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
        CombiningState state2 = this.underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
        CombiningState state3 = this.underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
        MatcherAssert.assertThat((int[]) state.getAccum(), Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat((int[]) state2.getAccum(), Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat((int[]) state3.getAccum(), Matchers.is(Matchers.notNullValue()));
        state.add(5);
        state2.add(10);
        state.add(6);
        StateMerging.mergeCombiningValues(Arrays.asList(state, state2), state3);
        MatcherAssert.assertThat((Integer) state.read(), Matchers.equalTo(0));
        MatcherAssert.assertThat((Integer) state2.read(), Matchers.equalTo(0));
        MatcherAssert.assertThat((Integer) state3.read(), Matchers.equalTo(21));
    }

    @Test
    public void testMergeCombiningWithContextValueIntoSource() throws Exception {
        CombiningState state = this.underTest.state(NAMESPACE_1, SUM_INTEGER_CONTEXT_ADDR);
        CombiningState state2 = this.underTest.state(NAMESPACE_2, SUM_INTEGER_CONTEXT_ADDR);
        MatcherAssert.assertThat((Integer) state.getAccum(), Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat((Integer) state2.getAccum(), Matchers.is(Matchers.notNullValue()));
        state.add(5);
        state2.add(10);
        state.add(6);
        MatcherAssert.assertThat((Integer) state.read(), Matchers.equalTo(11));
        MatcherAssert.assertThat((Integer) state2.read(), Matchers.equalTo(10));
        StateMerging.mergeCombiningValues(Arrays.asList(state, state2), state);
        MatcherAssert.assertThat((Integer) state.read(), Matchers.equalTo(21));
        MatcherAssert.assertThat((Integer) state2.read(), Matchers.equalTo(0));
    }

    @Test
    public void testMergeCombiningWithContextValueIntoNewNamespace() throws Exception {
        CombiningState state = this.underTest.state(NAMESPACE_1, SUM_INTEGER_CONTEXT_ADDR);
        CombiningState state2 = this.underTest.state(NAMESPACE_2, SUM_INTEGER_CONTEXT_ADDR);
        CombiningState state3 = this.underTest.state(NAMESPACE_3, SUM_INTEGER_CONTEXT_ADDR);
        MatcherAssert.assertThat((Integer) state.getAccum(), Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat((Integer) state2.getAccum(), Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat((Integer) state3.getAccum(), Matchers.is(Matchers.notNullValue()));
        state.add(5);
        state2.add(10);
        state.add(6);
        StateMerging.mergeCombiningValues(Arrays.asList(state, state2), state3);
        MatcherAssert.assertThat((Integer) state.read(), Matchers.equalTo(0));
        MatcherAssert.assertThat((Integer) state2.read(), Matchers.equalTo(0));
        MatcherAssert.assertThat((Integer) state3.read(), Matchers.equalTo(21));
    }

    @Test
    public void testWatermarkEarliestState() throws Exception {
        WatermarkHoldState state = this.underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
        Assert.assertEquals(state, this.underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
        Assert.assertFalse(state.equals(this.underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
        MatcherAssert.assertThat((Instant) state.read(), Matchers.nullValue());
        state.add(new Instant(2000L));
        MatcherAssert.assertThat((Instant) state.read(), Matchers.equalTo(new Instant(2000L)));
        state.add(new Instant(3000L));
        MatcherAssert.assertThat((Instant) state.read(), Matchers.equalTo(new Instant(2000L)));
        state.add(new Instant(1000L));
        MatcherAssert.assertThat((Instant) state.read(), Matchers.equalTo(new Instant(1000L)));
        state.clear();
        MatcherAssert.assertThat((Instant) state.read(), Matchers.equalTo((Object) null));
        MatcherAssert.assertThat(this.underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), Matchers.equalTo(state));
    }

    @Test
    public void testWatermarkLatestState() throws Exception {
        WatermarkHoldState state = this.underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
        Assert.assertEquals(state, this.underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
        Assert.assertFalse(state.equals(this.underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
        MatcherAssert.assertThat((Instant) state.read(), Matchers.nullValue());
        state.add(new Instant(2000L));
        MatcherAssert.assertThat((Instant) state.read(), Matchers.equalTo(new Instant(2000L)));
        state.add(new Instant(3000L));
        MatcherAssert.assertThat((Instant) state.read(), Matchers.equalTo(new Instant(3000L)));
        state.add(new Instant(1000L));
        MatcherAssert.assertThat((Instant) state.read(), Matchers.equalTo(new Instant(3000L)));
        state.clear();
        MatcherAssert.assertThat((Instant) state.read(), Matchers.equalTo((Object) null));
        MatcherAssert.assertThat(this.underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), Matchers.equalTo(state));
    }

    @Test
    public void testWatermarkEndOfWindowState() throws Exception {
        WatermarkHoldState state = this.underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
        Assert.assertEquals(state, this.underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
        Assert.assertFalse(state.equals(this.underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
        MatcherAssert.assertThat((Instant) state.read(), Matchers.nullValue());
        state.add(new Instant(2000L));
        MatcherAssert.assertThat((Instant) state.read(), Matchers.equalTo(new Instant(2000L)));
        state.clear();
        MatcherAssert.assertThat((Instant) state.read(), Matchers.equalTo((Object) null));
        MatcherAssert.assertThat(this.underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), Matchers.equalTo(state));
    }

    @Test
    public void testWatermarkStateIsEmpty() throws Exception {
        WatermarkHoldState state = this.underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
        MatcherAssert.assertThat((Boolean) state.isEmpty().read(), Matchers.is(true));
        ReadableState isEmpty = state.isEmpty();
        state.add(new Instant(1000L));
        MatcherAssert.assertThat((Boolean) isEmpty.read(), Matchers.is(false));
        state.clear();
        MatcherAssert.assertThat((Boolean) isEmpty.read(), Matchers.is(true));
    }

    @Test
    public void testSetReadable() throws Exception {
        SetState state = this.underTest.state(NAMESPACE_1, STRING_SET_ADDR);
        ReadableState contains = state.contains("A");
        state.add("A");
        Assert.assertFalse(((Boolean) contains.read()).booleanValue());
        state.addIfAbsent("B");
        Assert.assertTrue(((Boolean) state.contains("B").read()).booleanValue());
    }

    @Test
    public void testMapReadable() throws Exception {
        MapState state = this.underTest.state(NAMESPACE_1, STRING_MAP_ADDR);
        ReadableState keys = state.keys();
        ReadableState values = state.values();
        ReadableState entries = state.entries();
        state.put("A", 1);
        Assert.assertFalse(Iterables.isEmpty((Iterable) keys.read()));
        Assert.assertFalse(Iterables.isEmpty((Iterable) values.read()));
        Assert.assertFalse(Iterables.isEmpty((Iterable) entries.read()));
        ReadableState readableState = state.get("B");
        state.put("B", 2);
        MatcherAssert.assertThat((Integer) readableState.read(), Matchers.equalTo(2));
        state.putIfAbsent("C", 3);
        MatcherAssert.assertThat((Integer) state.get("C").read(), Matchers.equalTo(3));
    }

    @Test
    public void testBagWithBadCoderEquality() throws Exception {
        MatcherAssert.assertThat(new StringCoderWithIdentityEquality(), Matchers.not(Matchers.equalTo(new StringCoderWithIdentityEquality())));
        this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR1).add("hello");
        MatcherAssert.assertThat(this.underTest.state(NAMESPACE_1, STRING_BAG_ADDR2).read(), Matchers.containsInAnyOrder(new String[]{"hello"}));
    }
}
