/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.streaming;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.UUID;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsTest;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class FlinkStateInternalsTest
extends StateInternalsTest {
    protected @UnknownKeyFor @NonNull @Initialized StateInternals createStateInternals() {
        try {
            KeyedStateBackend<ByteBuffer> keyedStateBackend = FlinkStateInternalsTest.createStateBackend();
            return new FlinkStateInternals(keyedStateBackend, (Coder)StringUtf8Coder.of(), new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults()));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testWatermarkHoldsPersistence() throws @UnknownKeyFor @NonNull @Initialized Exception {
        KeyedStateBackend<ByteBuffer> keyedStateBackend = FlinkStateInternalsTest.createStateBackend();
        FlinkStateInternals stateInternals = new FlinkStateInternals(keyedStateBackend, (Coder)StringUtf8Coder.of(), new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults()));
        StateTag stateTag = StateTags.watermarkStateInternal((String)"hold", (TimestampCombiner)TimestampCombiner.EARLIEST);
        WatermarkHoldState globalWindow = (WatermarkHoldState)stateInternals.state(StateNamespaces.global(), stateTag);
        WatermarkHoldState fixedWindow = (WatermarkHoldState)stateInternals.state(StateNamespaces.window((Coder)IntervalWindow.getCoder(), (BoundedWindow)new IntervalWindow(new Instant(0L), new Instant(10L))), stateTag);
        Instant noHold = new Instant(Long.MAX_VALUE);
        MatcherAssert.assertThat((Object)stateInternals.minWatermarkHoldMs(), (Matcher)Is.is((Object)noHold.getMillis()));
        Instant high = new Instant(10L);
        globalWindow.add((Object)high);
        MatcherAssert.assertThat((Object)stateInternals.minWatermarkHoldMs(), (Matcher)Is.is((Object)high.getMillis()));
        Instant middle = new Instant(5L);
        fixedWindow.add((Object)middle);
        MatcherAssert.assertThat((Object)stateInternals.minWatermarkHoldMs(), (Matcher)Is.is((Object)middle.getMillis()));
        Instant low = new Instant(1L);
        globalWindow.add((Object)low);
        MatcherAssert.assertThat((Object)stateInternals.minWatermarkHoldMs(), (Matcher)Is.is((Object)low.getMillis()));
        globalWindow.add((Object)high);
        MatcherAssert.assertThat((Object)stateInternals.minWatermarkHoldMs(), (Matcher)Is.is((Object)low.getMillis()));
        fixedWindow.add((Object)high);
        MatcherAssert.assertThat((Object)stateInternals.minWatermarkHoldMs(), (Matcher)Is.is((Object)low.getMillis()));
        ByteBuffer firstKey = (ByteBuffer)keyedStateBackend.getCurrentKey();
        FlinkStateInternalsTest.changeKey(keyedStateBackend);
        ByteBuffer secondKey = (ByteBuffer)keyedStateBackend.getCurrentKey();
        MatcherAssert.assertThat((Object)firstKey, (Matcher)Is.is((Matcher)Matchers.not((Object)secondKey)));
        MatcherAssert.assertThat((Object)stateInternals.minWatermarkHoldMs(), (Matcher)Is.is((Object)low.getMillis()));
        MatcherAssert.assertThat((Object)((Instant)globalWindow.read()), (Matcher)Is.is((Matcher)Matchers.nullValue()));
        MatcherAssert.assertThat((Object)((Instant)fixedWindow.read()), (Matcher)Is.is((Matcher)Matchers.nullValue()));
        globalWindow.add((Object)middle);
        fixedWindow.add((Object)high);
        MatcherAssert.assertThat((Object)((Instant)globalWindow.read()), (Matcher)Is.is((Object)middle));
        MatcherAssert.assertThat((Object)((Instant)fixedWindow.read()), (Matcher)Is.is((Object)high));
        keyedStateBackend.setCurrentKey((Object)firstKey);
        MatcherAssert.assertThat((Object)((Instant)globalWindow.read()), (Matcher)Is.is((Object)low));
        MatcherAssert.assertThat((Object)((Instant)fixedWindow.read()), (Matcher)Is.is((Object)middle));
        stateInternals = new FlinkStateInternals(keyedStateBackend, (Coder)StringUtf8Coder.of(), new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults()));
        globalWindow = (WatermarkHoldState)stateInternals.state(StateNamespaces.global(), stateTag);
        fixedWindow = (WatermarkHoldState)stateInternals.state(StateNamespaces.window((Coder)IntervalWindow.getCoder(), (BoundedWindow)new IntervalWindow(new Instant(0L), new Instant(10L))), stateTag);
        MatcherAssert.assertThat((Object)stateInternals.minWatermarkHoldMs(), (Matcher)Is.is((Object)low.getMillis()));
        keyedStateBackend.setCurrentKey((Object)secondKey);
        MatcherAssert.assertThat((Object)((Instant)globalWindow.read()), (Matcher)Is.is((Object)middle));
        MatcherAssert.assertThat((Object)((Instant)fixedWindow.read()), (Matcher)Is.is((Object)high));
        globalWindow.clear();
        fixedWindow.clear();
        keyedStateBackend.setCurrentKey((Object)firstKey);
        MatcherAssert.assertThat((Object)((Instant)globalWindow.read()), (Matcher)Is.is((Object)low));
        MatcherAssert.assertThat((Object)((Instant)fixedWindow.read()), (Matcher)Is.is((Object)middle));
        fixedWindow.clear();
        MatcherAssert.assertThat((Object)stateInternals.minWatermarkHoldMs(), (Matcher)Is.is((Object)low.getMillis()));
        globalWindow.clear();
        MatcherAssert.assertThat((Object)stateInternals.minWatermarkHoldMs(), (Matcher)Is.is((Object)noHold.getMillis()));
    }

    @Test
    public void testGlobalWindowWatermarkHoldClear() throws @UnknownKeyFor @NonNull @Initialized Exception {
        KeyedStateBackend<ByteBuffer> keyedStateBackend = FlinkStateInternalsTest.createStateBackend();
        FlinkStateInternals stateInternals = new FlinkStateInternals(keyedStateBackend, (Coder)StringUtf8Coder.of(), new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults()));
        StateTag stateTag = StateTags.watermarkStateInternal((String)"hold", (TimestampCombiner)TimestampCombiner.EARLIEST);
        Instant now = Instant.now();
        WatermarkHoldState state = (WatermarkHoldState)stateInternals.state(StateNamespaces.global(), stateTag);
        state.add((Object)now);
        stateInternals.clearGlobalState();
        MatcherAssert.assertThat((Object)((Instant)state.read()), (Matcher)Is.is((Object)null));
    }

    public static @UnknownKeyFor @NonNull @Initialized KeyedStateBackend<@UnknownKeyFor @NonNull @Initialized ByteBuffer> createStateBackend() throws @UnknownKeyFor @NonNull @Initialized Exception {
        MemoryStateBackend backend = new MemoryStateBackend();
        AbstractKeyedStateBackend keyedStateBackend = backend.createKeyedStateBackend((Environment)new DummyEnvironment("test", 1, 0), new JobID(), "test_op", new GenericTypeInfo(ByteBuffer.class).createSerializer(new ExecutionConfig()), 2, new KeyGroupRange(0, 1), new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()), TtlTimeProvider.DEFAULT, null, Collections.emptyList(), new CloseableRegistry());
        FlinkStateInternalsTest.changeKey((KeyedStateBackend<ByteBuffer>)keyedStateBackend);
        return keyedStateBackend;
    }

    private static void changeKey(@UnknownKeyFor @NonNull @Initialized KeyedStateBackend<@UnknownKeyFor @NonNull @Initialized ByteBuffer> keyedStateBackend) throws @UnknownKeyFor @NonNull @Initialized CoderException {
        keyedStateBackend.setCurrentKey((Object)ByteBuffer.wrap(CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)UUID.randomUUID().toString())));
    }
}

