package org.apache.flink.test.streaming.api.datastream;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDeclaration;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDeclaration;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDeclaration;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDeclaration;
import org.apache.flink.api.common.state.StateDeclarations;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDeclaration;
import org.apache.flink.api.common.typeinfo.TypeDescriptors;
import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.ExecutionEnvironment;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.PartitionedContext;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
import org.apache.flink.util.FlinkRuntimeException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase.class */
class StatefulDataStreamV2ITCase {
    private KeyedPartitionStream<Long, Long> keyedPartitionStream;
    private ExecutionEnvironment env;

    /* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase$DefaultKeySelector.class */
    private static class DefaultKeySelector implements KeySelector<Long, Long> {
        private static final long serialVersionUID = 1;

        private DefaultKeySelector() {
        }

        public Long getKey(Long l) throws Exception {
            return l;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase$MockAggregateSumFunction.class */
    private static class MockAggregateSumFunction implements AggregateFunction<Long, Long, Long> {
        private MockAggregateSumFunction() {
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Long m973createAccumulator() {
            return 0L;
        }

        public Long add(Long l, Long l2) {
            return Long.valueOf(l.longValue() + l2.longValue());
        }

        public Long getResult(Long l) {
            return l;
        }

        public Long merge(Long l, Long l2) {
            return Long.valueOf(l.longValue() + l2.longValue());
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase$MockAggregateSumProcessFunction.class */
    private static class MockAggregateSumProcessFunction implements OneInputStreamProcessFunction<Long, String> {
        private final AggregatingStateDeclaration<Long, Long, Long> stateDeclaration = StateDeclarations.aggregatingState("reducing-state", TypeDescriptors.LONG, new MockAggregateSumFunction());

        public void processRecord(Long l, Collector<String> collector, PartitionedContext partitionedContext) throws Exception {
            Optional state = partitionedContext.getStateManager().getState(this.stateDeclaration);
            if (!state.isPresent()) {
                throw new FlinkRuntimeException("State not found: " + this.stateDeclaration);
            }
            ((AggregatingState) state.get()).add(l);
            collector.collect(Long.toString(((Long) ((AggregatingState) state.get()).get()).longValue()));
        }

        public /* bridge */ /* synthetic */ void processRecord(Object obj, Collector collector, PartitionedContext partitionedContext) throws Exception {
            processRecord((Long) obj, (Collector<String>) collector, partitionedContext);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase$MockCountMapProcessFunction.class */
    private static class MockCountMapProcessFunction implements OneInputStreamProcessFunction<Long, String> {
        private final MapStateDeclaration<Long, Long> stateDeclaration = StateDeclarations.mapState("map-state", TypeDescriptors.LONG, TypeDescriptors.LONG);

        public void processRecord(Long l, Collector<String> collector, PartitionedContext partitionedContext) throws Exception {
            Optional state = partitionedContext.getStateManager().getState(this.stateDeclaration);
            if (!state.isPresent()) {
                throw new FlinkRuntimeException("State not found: " + this.stateDeclaration);
            }
            Long l2 = (Long) ((MapState) state.get()).get(l);
            Long valueOf = Long.valueOf(l2 == null ? 1L : l2.longValue() + 1);
            ((MapState) state.get()).put(l, valueOf);
            collector.collect(Long.toString(valueOf.longValue()));
        }

        public /* bridge */ /* synthetic */ void processRecord(Object obj, Collector collector, PartitionedContext partitionedContext) throws Exception {
            processRecord((Long) obj, (Collector<String>) collector, partitionedContext);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase$MockListCountProcessFunction.class */
    private static class MockListCountProcessFunction implements OneInputStreamProcessFunction<Long, String> {
        private final ListStateDeclaration<Long> stateDeclaration = StateDeclarations.listState("list-state", TypeDescriptors.LONG);

        public void processRecord(Long l, Collector<String> collector, PartitionedContext partitionedContext) throws Exception {
            Optional state = partitionedContext.getStateManager().getState(this.stateDeclaration);
            if (!state.isPresent()) {
                throw new FlinkRuntimeException("State not found: " + this.stateDeclaration);
            }
            ListState listState = (ListState) state.get();
            listState.add(l);
            StringBuilder sb = new StringBuilder();
            Iterator it = ((Iterable) listState.get()).iterator();
            while (it.hasNext()) {
                sb.append((Long) it.next());
                sb.append(",");
            }
            if (sb.length() > 0) {
                sb.deleteCharAt(sb.length() - 1);
            }
            collector.collect(sb.toString());
        }

        public /* bridge */ /* synthetic */ void processRecord(Object obj, Collector collector, PartitionedContext partitionedContext) throws Exception {
            processRecord((Long) obj, (Collector<String>) collector, partitionedContext);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase$MockReduceSumFunction.class */
    private static class MockReduceSumFunction implements ReduceFunction<Long> {
        private MockReduceSumFunction() {
        }

        public Long reduce(Long l, Long l2) throws Exception {
            return Long.valueOf(l.longValue() + l2.longValue());
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase$MockReducingSumProcessFunction.class */
    private static class MockReducingSumProcessFunction implements OneInputStreamProcessFunction<Long, String> {
        private final ReducingStateDeclaration<Long> stateDeclaration = StateDeclarations.reducingState("reducing-state", TypeDescriptors.LONG, new MockReduceSumFunction());

        public void processRecord(Long l, Collector<String> collector, PartitionedContext partitionedContext) throws Exception {
            Optional state = partitionedContext.getStateManager().getState(this.stateDeclaration);
            if (!state.isPresent()) {
                throw new FlinkRuntimeException("State not found: " + this.stateDeclaration);
            }
            ((ReducingState) state.get()).add(l);
            collector.collect(Long.toString(((Long) ((ReducingState) state.get()).get()).longValue()));
        }

        public /* bridge */ /* synthetic */ void processRecord(Object obj, Collector collector, PartitionedContext partitionedContext) throws Exception {
            processRecord((Long) obj, (Collector<String>) collector, partitionedContext);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase$MockSumProcessFunction.class */
    private static class MockSumProcessFunction implements OneInputStreamProcessFunction<Long, String> {
        private final ValueStateDeclaration<Long> stateDeclaration = StateDeclarations.valueState("value-state", TypeDescriptors.LONG);

        public void processRecord(Long l, Collector<String> collector, PartitionedContext partitionedContext) throws Exception {
            Optional state = partitionedContext.getStateManager().getState(this.stateDeclaration);
            if (!state.isPresent()) {
                throw new FlinkRuntimeException("State not found: " + this.stateDeclaration);
            }
            Long l2 = (Long) ((ValueState) state.get()).value();
            ((ValueState) state.get()).update(Long.valueOf(Long.valueOf(l2 == null ? 0L : l2.longValue()).longValue() + l.longValue()));
            collector.collect(Long.toString(((Long) ((ValueState) state.get()).value()).longValue()));
        }

        public /* bridge */ /* synthetic */ void processRecord(Object obj, Collector collector, PartitionedContext partitionedContext) throws Exception {
            processRecord((Long) obj, (Collector<String>) collector, partitionedContext);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase$MockVerifierFunction.class */
    private static class MockVerifierFunction implements OneInputStreamProcessFunction<String, Object> {
        private final List<Object> allValues;

        public MockVerifierFunction(List<Object> list) {
            this.allValues = new ArrayList(list);
        }

        public void processRecord(String str, Collector<Object> collector, PartitionedContext partitionedContext) throws Exception {
            if (!this.allValues.contains(str)) {
                throw new FlinkRuntimeException("Record not found: " + str);
            }
            this.allValues.remove(str);
        }

        public /* bridge */ /* synthetic */ void processRecord(Object obj, Collector collector, PartitionedContext partitionedContext) throws Exception {
            processRecord((String) obj, (Collector<Object>) collector, partitionedContext);
        }
    }

    StatefulDataStreamV2ITCase() {
    }

    @BeforeEach
    void setUp() throws ReflectiveOperationException {
        DefaultKeySelector defaultKeySelector = new DefaultKeySelector();
        this.env = ExecutionEnvironment.getInstance();
        this.keyedPartitionStream = this.env.fromSource(DataStreamV2SourceUtils.fromData(Arrays.asList(1L, 1L, 1L)), "test-source").keyBy(defaultKeySelector);
    }

    @Test
    void testValueState() throws Exception {
        this.keyedPartitionStream.process(new MockSumProcessFunction()).global().process(new MockVerifierFunction(Arrays.asList("1", "2", "3")));
        this.env.execute("dsV2 job");
    }

    @Test
    void testListState() throws Exception {
        this.keyedPartitionStream.process(new MockListCountProcessFunction()).global().process(new MockVerifierFunction(Arrays.asList("1", "1,1", "1,1,1")));
        this.env.execute("dsV2 job");
    }

    @Test
    void testMapState() throws Exception {
        this.keyedPartitionStream.process(new MockCountMapProcessFunction()).global().process(new MockVerifierFunction(Arrays.asList("1", "2", "3")));
        this.env.execute("dsV2 job");
    }

    @Test
    void testReducingState() throws Exception {
        this.keyedPartitionStream.process(new MockReducingSumProcessFunction()).global().process(new MockVerifierFunction(Arrays.asList("1", "2", "3")));
        this.env.execute("dsV2 job");
    }

    @Test
    void testAggregatingState() throws Exception {
        this.keyedPartitionStream.process(new MockAggregateSumProcessFunction()).global().process(new MockVerifierFunction(Arrays.asList("1", "2", "3")));
        this.env.execute("dsV2 job");
    }
}
