/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api.output;

import java.io.Serializable;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
import org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class KeyedStateBootstrapOperatorTest {
    private static final ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor("state", Types.LONG);
    private static final Long EVENT_TIMER = 0x7FFFFFFFFFFFFFFEL;
    private static final Long PROC_TIMER = 0x7FFFFFFFFFFFFFFDL;
    @Rule
    public TemporaryFolder folder = new TemporaryFolder();

    @Test
    public void testTimerStateRestorable() throws Exception {
        OperatorSubtaskState state;
        Path path = new Path(this.folder.newFolder().toURI());
        KeyedStateBootstrapOperator bootstrapOperator = new KeyedStateBootstrapOperator(0L, path, (KeyedStateBootstrapFunction)new TimerBootstrapFunction());
        try (KeyedOneInputStreamOperatorTestHarness harness = this.getHarness((OneInputStreamOperator)bootstrapOperator);){
            this.processElements(harness, 1L, 2L, 3L);
            state = this.getState((KeyedStateBootstrapOperator<Long, Long>)bootstrapOperator, harness);
        }
        KeyedProcessOperator procOperator = new KeyedProcessOperator((KeyedProcessFunction)new SimpleProcessFunction());
        try (KeyedOneInputStreamOperatorTestHarness harness = this.getHarness((OneInputStreamOperator)procOperator, state);){
            harness.processWatermark(EVENT_TIMER.longValue());
            harness.setProcessingTime(PROC_TIMER.longValue());
            this.assertHarnessOutput(harness, Tuple3.of((Object)1L, (Object)EVENT_TIMER, (Object)TimeDomain.EVENT_TIME), Tuple3.of((Object)2L, (Object)EVENT_TIMER, (Object)TimeDomain.EVENT_TIME), Tuple3.of((Object)3L, (Object)EVENT_TIMER, (Object)TimeDomain.EVENT_TIME), Tuple3.of((Object)1L, (Object)PROC_TIMER, (Object)TimeDomain.PROCESSING_TIME), Tuple3.of((Object)2L, (Object)PROC_TIMER, (Object)TimeDomain.PROCESSING_TIME), Tuple3.of((Object)3L, (Object)PROC_TIMER, (Object)TimeDomain.PROCESSING_TIME));
        }
    }

    @Test
    public void testNonTimerStatesRestorableByNonProcessesOperator() throws Exception {
        OperatorSubtaskState state;
        Path path = new Path(this.folder.newFolder().toURI());
        KeyedStateBootstrapOperator bootstrapOperator = new KeyedStateBootstrapOperator(0L, path, (KeyedStateBootstrapFunction)new SimpleBootstrapFunction());
        try (KeyedOneInputStreamOperatorTestHarness harness = this.getHarness((OneInputStreamOperator)bootstrapOperator);){
            this.processElements(harness, 1L, 2L, 3L);
            state = this.getState((KeyedStateBootstrapOperator<Long, Long>)bootstrapOperator, harness);
        }
        StreamMap mapOperator = new StreamMap((MapFunction)new StreamingFunction());
        try (KeyedOneInputStreamOperatorTestHarness harness = this.getHarness((OneInputStreamOperator)mapOperator, state);){
            this.processElements(harness, 1L, 2L, 3L);
            this.assertHarnessOutput(harness, 1L, 2L, 3L);
            harness.snapshot(0L, 0L);
        }
    }

    private <T> KeyedOneInputStreamOperatorTestHarness<Long, Long, T> getHarness(OneInputStreamOperator<Long, T> bootstrapOperator) throws Exception {
        return this.getHarness(bootstrapOperator, null);
    }

    private <T> KeyedOneInputStreamOperatorTestHarness<Long, Long, T> getHarness(OneInputStreamOperator<Long, T> bootstrapOperator, OperatorSubtaskState state) throws Exception {
        KeyedOneInputStreamOperatorTestHarness harness = new KeyedOneInputStreamOperatorTestHarness(bootstrapOperator, (KeySelector & Serializable)id -> id, Types.LONG, 128, 1, 0);
        harness.setStateBackend((StateBackend)new RocksDBStateBackend(this.folder.newFolder().toURI()));
        if (state != null) {
            harness.initializeState(state);
        }
        harness.open();
        return harness;
    }

    private <T> void processElements(KeyedOneInputStreamOperatorTestHarness<Long, Long, T> harness, Long ... records) throws Exception {
        for (Long record : records) {
            harness.processElement((Object)record, 0L);
        }
    }

    private OperatorSubtaskState getState(KeyedStateBootstrapOperator<Long, Long> bootstrapOperator, KeyedOneInputStreamOperatorTestHarness<Long, Long, TaggedOperatorSubtaskState> harness) throws Exception {
        bootstrapOperator.endInput();
        OperatorSubtaskState state = ((TaggedOperatorSubtaskState)harness.extractOutputValues().get((int)0)).state;
        return state;
    }

    private <T> void assertHarnessOutput(KeyedOneInputStreamOperatorTestHarness<Long, Long, T> harness, T ... output) {
        Assert.assertThat((String)"The output from the operator does not match the expected values", (Object)harness.extractOutputValues(), (Matcher)Matchers.containsInAnyOrder((Object[])output));
    }

    private static class StreamingFunction
    extends RichMapFunction<Long, Long> {
        private ValueState<Long> state;

        private StreamingFunction() {
        }

        public void open(Configuration parameters) throws Exception {
            this.state = this.getRuntimeContext().getState(descriptor);
        }

        public Long map(Long value) throws Exception {
            return (Long)this.state.value();
        }
    }

    private static class SimpleBootstrapFunction
    extends KeyedStateBootstrapFunction<Long, Long> {
        private ValueState<Long> state;

        private SimpleBootstrapFunction() {
        }

        public void open(Configuration parameters) throws Exception {
            this.state = this.getRuntimeContext().getState(descriptor);
        }

        public void processElement(Long value, KeyedStateBootstrapFunction.Context ctx) throws Exception {
            this.state.update((Object)value);
        }
    }

    private static class SimpleProcessFunction
    extends KeyedProcessFunction<Long, Long, Tuple3<Long, Long, TimeDomain>> {
        private SimpleProcessFunction() {
        }

        public void processElement(Long value, KeyedProcessFunction.Context ctx, Collector<Tuple3<Long, Long, TimeDomain>> out) throws Exception {
        }

        public void onTimer(long timestamp, KeyedProcessFunction.OnTimerContext ctx, Collector<Tuple3<Long, Long, TimeDomain>> out) throws Exception {
            out.collect((Object)Tuple3.of((Object)ctx.getCurrentKey(), (Object)timestamp, (Object)ctx.timeDomain()));
        }
    }

    private static class TimerBootstrapFunction
    extends KeyedStateBootstrapFunction<Long, Long> {
        private TimerBootstrapFunction() {
        }

        public void processElement(Long value, KeyedStateBootstrapFunction.Context ctx) throws Exception {
            ctx.timerService().registerEventTimeTimer(EVENT_TIMER.longValue());
            ctx.timerService().registerProcessingTimeTimer(PROC_TIMER.longValue());
        }
    }
}

