/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.api.datastream;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDeclaration;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDeclaration;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDeclaration;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDeclaration;
import org.apache.flink.api.common.state.StateDeclarations;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDeclaration;
import org.apache.flink.api.common.typeinfo.TypeDescriptor;
import org.apache.flink.api.common.typeinfo.TypeDescriptors;
import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.ExecutionEnvironment;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.PartitionedContext;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
import org.apache.flink.util.FlinkRuntimeException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class StatefulDataStreamV2ITCase {
    private KeyedPartitionStream<Long, Long> keyedPartitionStream;
    private ExecutionEnvironment env;

    StatefulDataStreamV2ITCase() {
    }

    @BeforeEach
    void setUp() throws ReflectiveOperationException {
        DefaultKeySelector defaultKeySelector = new DefaultKeySelector();
        this.env = ExecutionEnvironment.getInstance();
        this.keyedPartitionStream = this.env.fromSource(DataStreamV2SourceUtils.fromData(Arrays.asList(1L, 1L, 1L)), "test-source").keyBy((KeySelector)defaultKeySelector);
    }

    @Test
    void testValueState() throws Exception {
        MockSumProcessFunction processFunction = new MockSumProcessFunction();
        MockVerifierFunction verifierFunction = new MockVerifierFunction(Arrays.asList("1", "2", "3"));
        this.keyedPartitionStream.process((OneInputStreamProcessFunction)processFunction).global().process((OneInputStreamProcessFunction)verifierFunction);
        this.env.execute("dsV2 job");
    }

    @Test
    void testListState() throws Exception {
        MockListCountProcessFunction processFunction = new MockListCountProcessFunction();
        MockVerifierFunction verifierFunction = new MockVerifierFunction(Arrays.asList("1", "1,1", "1,1,1"));
        this.keyedPartitionStream.process((OneInputStreamProcessFunction)processFunction).global().process((OneInputStreamProcessFunction)verifierFunction);
        this.env.execute("dsV2 job");
    }

    @Test
    void testMapState() throws Exception {
        MockCountMapProcessFunction processFunction = new MockCountMapProcessFunction();
        MockVerifierFunction verifierFunction = new MockVerifierFunction(Arrays.asList("1", "2", "3"));
        this.keyedPartitionStream.process((OneInputStreamProcessFunction)processFunction).global().process((OneInputStreamProcessFunction)verifierFunction);
        this.env.execute("dsV2 job");
    }

    @Test
    void testReducingState() throws Exception {
        MockReducingSumProcessFunction processFunction = new MockReducingSumProcessFunction();
        MockVerifierFunction verifierFunction = new MockVerifierFunction(Arrays.asList("1", "2", "3"));
        this.keyedPartitionStream.process((OneInputStreamProcessFunction)processFunction).global().process((OneInputStreamProcessFunction)verifierFunction);
        this.env.execute("dsV2 job");
    }

    @Test
    void testAggregatingState() throws Exception {
        MockAggregateSumProcessFunction processFunction = new MockAggregateSumProcessFunction();
        MockVerifierFunction verifierFunction = new MockVerifierFunction(Arrays.asList("1", "2", "3"));
        this.keyedPartitionStream.process((OneInputStreamProcessFunction)processFunction).global().process((OneInputStreamProcessFunction)verifierFunction);
        this.env.execute("dsV2 job");
    }

    private static class MockVerifierFunction
    implements OneInputStreamProcessFunction<String, Object> {
        private final List<Object> allValues;

        public MockVerifierFunction(List<Object> allValues) {
            this.allValues = new ArrayList<Object>(allValues);
        }

        public void processRecord(String record, Collector<Object> output, PartitionedContext ctx) throws Exception {
            if (!this.allValues.contains(record)) {
                throw new FlinkRuntimeException("Record not found: " + record);
            }
            this.allValues.remove(record);
        }
    }

    private static class MockSumProcessFunction
    implements OneInputStreamProcessFunction<Long, String> {
        private final ValueStateDeclaration<Long> stateDeclaration = StateDeclarations.valueState((String)"value-state", (TypeDescriptor)TypeDescriptors.LONG);

        public void processRecord(Long record, Collector<String> output, PartitionedContext ctx) throws Exception {
            Optional maybeState = ctx.getStateManager().getState(this.stateDeclaration);
            if (!maybeState.isPresent()) {
                throw new FlinkRuntimeException("State not found: " + this.stateDeclaration);
            }
            Long currentValue = (Long)((ValueState)maybeState.get()).value();
            currentValue = currentValue == null ? 0L : currentValue;
            ((ValueState)maybeState.get()).update((Object)(currentValue + record));
            output.collect((Object)Long.toString((Long)((ValueState)maybeState.get()).value()));
        }
    }

    private static class MockListCountProcessFunction
    implements OneInputStreamProcessFunction<Long, String> {
        private final ListStateDeclaration<Long> stateDeclaration = StateDeclarations.listState((String)"list-state", (TypeDescriptor)TypeDescriptors.LONG);

        public void processRecord(Long record, Collector<String> output, PartitionedContext ctx) throws Exception {
            Optional maybeState = ctx.getStateManager().getState(this.stateDeclaration);
            if (!maybeState.isPresent()) {
                throw new FlinkRuntimeException("State not found: " + this.stateDeclaration);
            }
            ListState currentValue = (ListState)maybeState.get();
            currentValue.add((Object)record);
            StringBuilder stringBuilder = new StringBuilder();
            for (Long val : (Iterable)currentValue.get()) {
                stringBuilder.append(val);
                stringBuilder.append(",");
            }
            if (stringBuilder.length() > 0) {
                stringBuilder.deleteCharAt(stringBuilder.length() - 1);
            }
            output.collect((Object)stringBuilder.toString());
        }
    }

    private static class MockCountMapProcessFunction
    implements OneInputStreamProcessFunction<Long, String> {
        private final MapStateDeclaration<Long, Long> stateDeclaration = StateDeclarations.mapState((String)"map-state", (TypeDescriptor)TypeDescriptors.LONG, (TypeDescriptor)TypeDescriptors.LONG);

        public void processRecord(Long record, Collector<String> output, PartitionedContext ctx) throws Exception {
            Optional maybeState = ctx.getStateManager().getState(this.stateDeclaration);
            if (!maybeState.isPresent()) {
                throw new FlinkRuntimeException("State not found: " + this.stateDeclaration);
            }
            Long curOccurence = (Long)((MapState)maybeState.get()).get((Object)record);
            curOccurence = curOccurence == null ? 1L : curOccurence + 1L;
            ((MapState)maybeState.get()).put((Object)record, (Object)curOccurence);
            output.collect((Object)Long.toString(curOccurence));
        }
    }

    private static class MockReducingSumProcessFunction
    implements OneInputStreamProcessFunction<Long, String> {
        private final ReducingStateDeclaration<Long> stateDeclaration = StateDeclarations.reducingState((String)"reducing-state", (TypeDescriptor)TypeDescriptors.LONG, (ReduceFunction)new MockReduceSumFunction());

        public void processRecord(Long record, Collector<String> output, PartitionedContext ctx) throws Exception {
            Optional maybeState = ctx.getStateManager().getState(this.stateDeclaration);
            if (!maybeState.isPresent()) {
                throw new FlinkRuntimeException("State not found: " + this.stateDeclaration);
            }
            ((ReducingState)maybeState.get()).add((Object)record);
            output.collect((Object)Long.toString((Long)((ReducingState)maybeState.get()).get()));
        }
    }

    private static class MockAggregateSumProcessFunction
    implements OneInputStreamProcessFunction<Long, String> {
        private final AggregatingStateDeclaration<Long, Long, Long> stateDeclaration = StateDeclarations.aggregatingState((String)"reducing-state", (TypeDescriptor)TypeDescriptors.LONG, (AggregateFunction)new MockAggregateSumFunction());

        public void processRecord(Long record, Collector<String> output, PartitionedContext ctx) throws Exception {
            Optional maybeState = ctx.getStateManager().getState(this.stateDeclaration);
            if (!maybeState.isPresent()) {
                throw new FlinkRuntimeException("State not found: " + this.stateDeclaration);
            }
            ((AggregatingState)maybeState.get()).add((Object)record);
            output.collect((Object)Long.toString((Long)((AggregatingState)maybeState.get()).get()));
        }
    }

    private static class MockReduceSumFunction
    implements ReduceFunction<Long> {
        private MockReduceSumFunction() {
        }

        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }
    }

    private static class MockAggregateSumFunction
    implements AggregateFunction<Long, Long, Long> {
        private MockAggregateSumFunction() {
        }

        public Long createAccumulator() {
            return 0L;
        }

        public Long add(Long value, Long accumulator) {
            return value + accumulator;
        }

        public Long getResult(Long accumulator) {
            return accumulator;
        }

        public Long merge(Long a, Long b) {
            return a + b;
        }
    }

    private static class DefaultKeySelector
    implements KeySelector<Long, Long> {
        private static final long serialVersionUID = 1L;

        private DefaultKeySelector() {
        }

        public Long getKey(Long value) throws Exception {
            return value;
        }
    }
}

