package org.apache.beam.runners.flink.streaming;

import java.nio.ByteBuffer;
import java.util.UUID;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsTest;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.class */
public class FlinkStateInternalsTest extends StateInternalsTest {
    protected StateInternals createStateInternals() {
        try {
            return new FlinkStateInternals(createStateBackend(), StringUtf8Coder.of());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testWatermarkHoldsPersistence() throws Exception {
        KeyedStateBackend<ByteBuffer> createStateBackend = createStateBackend();
        FlinkStateInternals flinkStateInternals = new FlinkStateInternals(createStateBackend, StringUtf8Coder.of());
        StateTag watermarkStateInternal = StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST);
        WatermarkHoldState state = flinkStateInternals.state(StateNamespaces.global(), watermarkStateInternal);
        WatermarkHoldState state2 = flinkStateInternals.state(StateNamespaces.window(IntervalWindow.getCoder(), new IntervalWindow(new Instant(0L), new Instant(10L))), watermarkStateInternal);
        Instant instant = new Instant(Long.MAX_VALUE);
        MatcherAssert.assertThat(flinkStateInternals.watermarkHold(), Is.is(instant));
        Instant instant2 = new Instant(10L);
        state.add(instant2);
        MatcherAssert.assertThat(flinkStateInternals.watermarkHold(), Is.is(instant2));
        Instant instant3 = new Instant(5L);
        state2.add(instant3);
        MatcherAssert.assertThat(flinkStateInternals.watermarkHold(), Is.is(instant3));
        Instant instant4 = new Instant(1L);
        state.add(instant4);
        MatcherAssert.assertThat(flinkStateInternals.watermarkHold(), Is.is(instant4));
        state.add(instant2);
        MatcherAssert.assertThat(flinkStateInternals.watermarkHold(), Is.is(instant4));
        state2.add(instant2);
        MatcherAssert.assertThat(flinkStateInternals.watermarkHold(), Is.is(instant4));
        changeKey(createStateBackend);
        FlinkStateInternals flinkStateInternals2 = new FlinkStateInternals(createStateBackend, StringUtf8Coder.of());
        WatermarkHoldState state3 = flinkStateInternals2.state(StateNamespaces.global(), watermarkStateInternal);
        WatermarkHoldState state4 = flinkStateInternals2.state(StateNamespaces.window(IntervalWindow.getCoder(), new IntervalWindow(new Instant(0L), new Instant(10L))), watermarkStateInternal);
        MatcherAssert.assertThat(flinkStateInternals2.watermarkHold(), Is.is(instant4));
        state4.clear();
        MatcherAssert.assertThat(flinkStateInternals2.watermarkHold(), Is.is(instant4));
        state3.clear();
        MatcherAssert.assertThat(flinkStateInternals2.watermarkHold(), Is.is(instant));
    }

    private KeyedStateBackend<ByteBuffer> createStateBackend() throws Exception {
        AbstractKeyedStateBackend createKeyedStateBackend = new MemoryStateBackend().createKeyedStateBackend(new DummyEnvironment("test", 1, 0), new JobID(), "test_op", new GenericTypeInfo(ByteBuffer.class).createSerializer(new ExecutionConfig()), 2, new KeyGroupRange(0, 1), new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
        changeKey(createKeyedStateBackend);
        return createKeyedStateBackend;
    }

    private void changeKey(KeyedStateBackend<ByteBuffer> keyedStateBackend) throws CoderException {
        keyedStateBackend.setCurrentKey(ByteBuffer.wrap(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), UUID.randomUUID().toString())));
    }
}
