/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.wrappers.streaming.state;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.ReadableStates;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateBinder;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineContextFactory;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.joda.time.Instant;

public class FlinkStateInternals<K>
implements StateInternals {
    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
    private Coder<K> keyCoder;
    private final TreeMap<Long, Integer> watermarkHolds = new TreeMap();
    private final MapStateDescriptor<String, Instant> watermarkHoldStateDescriptor = new MapStateDescriptor("watermark-holds", (TypeSerializer)StringSerializer.INSTANCE, new CoderTypeSerializer(InstantCoder.of()));

    public FlinkStateInternals(KeyedStateBackend<ByteBuffer> flinkStateBackend, Coder<K> keyCoder) throws Exception {
        this.flinkStateBackend = flinkStateBackend;
        this.keyCoder = keyCoder;
        this.restoreWatermarkHoldsView();
    }

    public Long minWatermarkHoldMs() {
        if (this.watermarkHolds.isEmpty()) {
            return Long.MAX_VALUE;
        }
        return this.watermarkHolds.firstKey();
    }

    public K getKey() {
        ByteBuffer keyBytes = (ByteBuffer)this.flinkStateBackend.getCurrentKey();
        return FlinkKeyUtils.decodeKey(keyBytes, this.keyCoder);
    }

    public <T extends State> T state(StateNamespace namespace, StateTag<T> address, StateContext<?> context) {
        return (T)address.getSpec().bind(address.getId(), (StateBinder)new FlinkStateBinder(namespace, context));
    }

    private void addWatermarkHoldUsage(Instant watermarkHold) {
        this.watermarkHolds.merge(watermarkHold.getMillis(), 1, Integer::sum);
    }

    private void removeWatermarkHoldUsage(Instant watermarkHold) {
        this.watermarkHolds.compute(watermarkHold.getMillis(), (hold, usage) -> {
            Objects.requireNonNull(usage);
            return usage == 1 ? null : Integer.valueOf(usage - 1);
        });
    }

    private void restoreWatermarkHoldsView() throws Exception {
        org.apache.flink.api.common.state.MapState mapState = (org.apache.flink.api.common.state.MapState)this.flinkStateBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this.watermarkHoldStateDescriptor);
        try (Stream keys = this.flinkStateBackend.getKeys(this.watermarkHoldStateDescriptor.getName(), (Object)VoidNamespace.INSTANCE);){
            Iterator iterator = keys.iterator();
            while (iterator.hasNext()) {
                this.flinkStateBackend.setCurrentKey((Object)((ByteBuffer)iterator.next()));
                mapState.values().forEach(this::addWatermarkHoldUsage);
            }
        }
    }

    public static class EarlyBinder
    implements StateBinder {
        private final KeyedStateBackend keyedStateBackend;

        public EarlyBinder(KeyedStateBackend keyedStateBackend) {
            this.keyedStateBackend = keyedStateBackend;
        }

        public <T> org.apache.beam.sdk.state.ValueState<T> bindValue(String id, StateSpec<org.apache.beam.sdk.state.ValueState<T>> spec, Coder<T> coder) {
            try {
                this.keyedStateBackend.getOrCreateKeyedState((TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor(id, new CoderTypeSerializer<T>(coder)));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return null;
        }

        public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
            try {
                this.keyedStateBackend.getOrCreateKeyedState((TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new ListStateDescriptor(id, new CoderTypeSerializer<T>(elemCoder)));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return null;
        }

        public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
            try {
                this.keyedStateBackend.getOrCreateKeyedState((TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new MapStateDescriptor(id, new CoderTypeSerializer<T>(elemCoder), (TypeSerializer)VoidSerializer.INSTANCE));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return null;
        }

        public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(String id, StateSpec<MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
            try {
                this.keyedStateBackend.getOrCreateKeyedState((TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new MapStateDescriptor(id, new CoderTypeSerializer<KeyT>(mapKeyCoder), new CoderTypeSerializer<ValueT>(mapValueCoder)));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return null;
        }

        public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining(String id, StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
            try {
                this.keyedStateBackend.getOrCreateKeyedState((TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor(id, new CoderTypeSerializer<AccumT>(accumCoder)));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return null;
        }

        public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(String id, StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
            try {
                this.keyedStateBackend.getOrCreateKeyedState((TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor(id, new CoderTypeSerializer<AccumT>(accumCoder)));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return null;
        }

        public WatermarkHoldState bindWatermark(String id, StateSpec<WatermarkHoldState> spec, TimestampCombiner timestampCombiner) {
            try {
                this.keyedStateBackend.getOrCreateKeyedState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new MapStateDescriptor("watermark-holds", (TypeSerializer)StringSerializer.INSTANCE, new CoderTypeSerializer(InstantCoder.of())));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return null;
        }
    }

    private static class FlinkSetState<T>
    implements SetState<T> {
        private final StateNamespace namespace;
        private final String stateId;
        private final MapStateDescriptor<T, Boolean> flinkStateDescriptor;
        private final KeyedStateBackend<ByteBuffer> flinkStateBackend;

        FlinkSetState(KeyedStateBackend<ByteBuffer> flinkStateBackend, String stateId, StateNamespace namespace, Coder<T> coder) {
            this.namespace = namespace;
            this.stateId = stateId;
            this.flinkStateBackend = flinkStateBackend;
            this.flinkStateDescriptor = new MapStateDescriptor(stateId, new CoderTypeSerializer<T>(coder), (TypeSerializer)new BooleanSerializer());
        }

        public ReadableState<Boolean> contains(T t) {
            try {
                Boolean result = (Boolean)((org.apache.flink.api.common.state.MapState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).get(t);
                return ReadableStates.immediate((Object)(result != null && result != false ? 1 : 0));
            }
            catch (Exception e) {
                throw new RuntimeException("Error contains value from state.", e);
            }
        }

        public ReadableState<Boolean> addIfAbsent(T t) {
            try {
                org.apache.flink.api.common.state.MapState state = (org.apache.flink.api.common.state.MapState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor);
                boolean alreadyContained = state.contains(t);
                if (!alreadyContained) {
                    state.put(t, (Object)true);
                }
                return ReadableStates.immediate((Object)(!alreadyContained ? 1 : 0));
            }
            catch (Exception e) {
                throw new RuntimeException("Error addIfAbsent value to state.", e);
            }
        }

        public void remove(T t) {
            try {
                ((org.apache.flink.api.common.state.MapState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).remove(t);
            }
            catch (Exception e) {
                throw new RuntimeException("Error remove value to state.", e);
            }
        }

        public SetState<T> readLater() {
            return this;
        }

        public void add(T value) {
            try {
                ((org.apache.flink.api.common.state.MapState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).put(value, (Object)true);
            }
            catch (Exception e) {
                throw new RuntimeException("Error add value to state.", e);
            }
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public Boolean read() {
                    try {
                        Iterable result = ((org.apache.flink.api.common.state.MapState)flinkStateBackend.getPartitionedState((Object)namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)flinkStateDescriptor)).keys();
                        return result == null || Iterables.isEmpty((Iterable)result);
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Error isEmpty from state.", e);
                    }
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        public Iterable<T> read() {
            try {
                Iterable result = ((org.apache.flink.api.common.state.MapState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).keys();
                return result != null ? ImmutableList.copyOf((Iterable)result) : Collections.emptyList();
            }
            catch (Exception e) {
                throw new RuntimeException("Error read from state.", e);
            }
        }

        public void clear() {
            try {
                ((org.apache.flink.api.common.state.MapState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).clear();
            }
            catch (Exception e) {
                throw new RuntimeException("Error clearing state.", e);
            }
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FlinkSetState that = (FlinkSetState)o;
            return this.namespace.equals(that.namespace) && this.stateId.equals(that.stateId);
        }

        public int hashCode() {
            int result = this.namespace.hashCode();
            result = 31 * result + this.stateId.hashCode();
            return result;
        }
    }

    private static class FlinkMapState<KeyT, ValueT>
    implements MapState<KeyT, ValueT> {
        private final StateNamespace namespace;
        private final String stateId;
        private final MapStateDescriptor<KeyT, ValueT> flinkStateDescriptor;
        private final KeyedStateBackend<ByteBuffer> flinkStateBackend;

        FlinkMapState(KeyedStateBackend<ByteBuffer> flinkStateBackend, String stateId, StateNamespace namespace, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
            this.namespace = namespace;
            this.stateId = stateId;
            this.flinkStateBackend = flinkStateBackend;
            this.flinkStateDescriptor = new MapStateDescriptor(stateId, new CoderTypeSerializer<KeyT>(mapKeyCoder), new CoderTypeSerializer<ValueT>(mapValueCoder));
        }

        public ReadableState<ValueT> get(KeyT input) {
            try {
                return ReadableStates.immediate((Object)((org.apache.flink.api.common.state.MapState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).get(input));
            }
            catch (Exception e) {
                throw new RuntimeException("Error get from state.", e);
            }
        }

        public void put(KeyT key, ValueT value) {
            try {
                ((org.apache.flink.api.common.state.MapState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).put(key, value);
            }
            catch (Exception e) {
                throw new RuntimeException("Error put kv to state.", e);
            }
        }

        public ReadableState<ValueT> putIfAbsent(KeyT key, ValueT value) {
            try {
                Object current = ((org.apache.flink.api.common.state.MapState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).get(key);
                if (current == null) {
                    ((org.apache.flink.api.common.state.MapState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).put(key, value);
                }
                return ReadableStates.immediate((Object)current);
            }
            catch (Exception e) {
                throw new RuntimeException("Error put kv to state.", e);
            }
        }

        public void remove(KeyT key) {
            try {
                ((org.apache.flink.api.common.state.MapState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).remove(key);
            }
            catch (Exception e) {
                throw new RuntimeException("Error remove map state key.", e);
            }
        }

        public ReadableState<Iterable<KeyT>> keys() {
            return new ReadableState<Iterable<KeyT>>(){

                public Iterable<KeyT> read() {
                    try {
                        Iterable result = ((org.apache.flink.api.common.state.MapState)flinkStateBackend.getPartitionedState((Object)namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)flinkStateDescriptor)).keys();
                        return result != null ? ImmutableList.copyOf((Iterable)result) : Collections.emptyList();
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Error get map state keys.", e);
                    }
                }

                public ReadableState<Iterable<KeyT>> readLater() {
                    return this;
                }
            };
        }

        public ReadableState<Iterable<ValueT>> values() {
            return new ReadableState<Iterable<ValueT>>(){

                public Iterable<ValueT> read() {
                    try {
                        Iterable result = ((org.apache.flink.api.common.state.MapState)flinkStateBackend.getPartitionedState((Object)namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)flinkStateDescriptor)).values();
                        return result != null ? ImmutableList.copyOf((Iterable)result) : Collections.emptyList();
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Error get map state values.", e);
                    }
                }

                public ReadableState<Iterable<ValueT>> readLater() {
                    return this;
                }
            };
        }

        public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> entries() {
            return new ReadableState<Iterable<Map.Entry<KeyT, ValueT>>>(){

                public Iterable<Map.Entry<KeyT, ValueT>> read() {
                    try {
                        Iterable result = ((org.apache.flink.api.common.state.MapState)flinkStateBackend.getPartitionedState((Object)namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)flinkStateDescriptor)).entries();
                        return result != null ? ImmutableList.copyOf((Iterable)result) : Collections.emptyList();
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Error get map state entries.", e);
                    }
                }

                public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> readLater() {
                    return this;
                }
            };
        }

        public void clear() {
            try {
                ((org.apache.flink.api.common.state.MapState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).clear();
            }
            catch (Exception e) {
                throw new RuntimeException("Error clearing state.", e);
            }
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FlinkMapState that = (FlinkMapState)o;
            return this.namespace.equals(that.namespace) && this.stateId.equals(that.stateId);
        }

        public int hashCode() {
            int result = this.namespace.hashCode();
            result = 31 * result + this.stateId.hashCode();
            return result;
        }
    }

    private class FlinkWatermarkHoldState
    implements WatermarkHoldState {
        private final TimestampCombiner timestampCombiner;
        private final String namespaceString;
        private org.apache.flink.api.common.state.MapState<String, Instant> watermarkHoldsState;

        public FlinkWatermarkHoldState(KeyedStateBackend<ByteBuffer> flinkStateBackend, MapStateDescriptor<String, Instant> watermarkHoldStateDescriptor, String stateId, StateNamespace namespace, TimestampCombiner timestampCombiner) {
            this.timestampCombiner = timestampCombiner;
            this.namespaceString = namespace.stringKey() + stateId;
            try {
                this.watermarkHoldsState = (org.apache.flink.api.common.state.MapState)flinkStateBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, watermarkHoldStateDescriptor);
            }
            catch (Exception e) {
                throw new RuntimeException("Could not access state for watermark partition view");
            }
        }

        public TimestampCombiner getTimestampCombiner() {
            return this.timestampCombiner;
        }

        public WatermarkHoldState readLater() {
            return this;
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public Boolean read() {
                    try {
                        return FlinkWatermarkHoldState.this.watermarkHoldsState.get((Object)FlinkWatermarkHoldState.this.namespaceString) == null;
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Error reading state.", e);
                    }
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        public void add(Instant value) {
            try {
                Instant current = (Instant)this.watermarkHoldsState.get((Object)this.namespaceString);
                if (current == null) {
                    FlinkStateInternals.this.addWatermarkHoldUsage(value);
                    this.watermarkHoldsState.put((Object)this.namespaceString, (Object)value);
                } else {
                    Instant combined = this.timestampCombiner.combine(new Instant[]{current, value});
                    if (combined.getMillis() != current.getMillis()) {
                        FlinkStateInternals.this.removeWatermarkHoldUsage(current);
                        FlinkStateInternals.this.addWatermarkHoldUsage(combined);
                        this.watermarkHoldsState.put((Object)this.namespaceString, (Object)combined);
                    }
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Error updating state.", e);
            }
        }

        public Instant read() {
            try {
                return (Instant)this.watermarkHoldsState.get((Object)this.namespaceString);
            }
            catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        public void clear() {
            Instant current = this.read();
            if (current != null) {
                FlinkStateInternals.this.removeWatermarkHoldUsage(current);
            }
            try {
                this.watermarkHoldsState.remove((Object)this.namespaceString);
            }
            catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FlinkWatermarkHoldState that = (FlinkWatermarkHoldState)o;
            if (!this.timestampCombiner.equals((Object)that.timestampCombiner)) {
                return false;
            }
            return this.namespaceString.equals(that.namespaceString);
        }

        public int hashCode() {
            int result = this.namespaceString.hashCode();
            result = 31 * result + this.timestampCombiner.hashCode();
            return result;
        }
    }

    private static class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT>
    implements CombiningState<InputT, AccumT, OutputT> {
        private final StateNamespace namespace;
        private final String stateId;
        private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
        private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
        private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
        private final CombineWithContext.Context context;

        FlinkCombiningStateWithContext(KeyedStateBackend<ByteBuffer> flinkStateBackend, String stateId, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, Coder<AccumT> accumCoder, CombineWithContext.Context context) {
            this.namespace = namespace;
            this.stateId = stateId;
            this.combineFn = combineFn;
            this.flinkStateBackend = flinkStateBackend;
            this.context = context;
            this.flinkStateDescriptor = new ValueStateDescriptor(stateId, new CoderTypeSerializer<AccumT>(accumCoder));
        }

        public CombiningState<InputT, AccumT, OutputT> readLater() {
            return this;
        }

        public void add(InputT value) {
            try {
                ValueState state = (ValueState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor);
                Object current = state.value();
                if (current == null) {
                    current = this.combineFn.createAccumulator(this.context);
                }
                current = this.combineFn.addInput(current, value, this.context);
                state.update(current);
            }
            catch (Exception e) {
                throw new RuntimeException("Error adding to state.", e);
            }
        }

        public void addAccum(AccumT accum) {
            try {
                ValueState state = (ValueState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor);
                Object current = state.value();
                if (current == null) {
                    state.update(accum);
                } else {
                    current = this.combineFn.mergeAccumulators((Iterable)Lists.newArrayList((Object[])new Object[]{current, accum}), this.context);
                    state.update(current);
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Error adding to state.", e);
            }
        }

        public AccumT getAccum() {
            try {
                Object accum = ((ValueState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).value();
                return (AccumT)(accum != null ? accum : this.combineFn.createAccumulator(this.context));
            }
            catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
            return (AccumT)this.combineFn.mergeAccumulators(accumulators, this.context);
        }

        public OutputT read() {
            try {
                ValueState state = (ValueState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor);
                Object accum = state.value();
                if (accum != null) {
                    return (OutputT)this.combineFn.extractOutput(accum, this.context);
                }
                return (OutputT)this.combineFn.extractOutput(this.combineFn.createAccumulator(this.context), this.context);
            }
            catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public Boolean read() {
                    try {
                        return ((ValueState)flinkStateBackend.getPartitionedState((Object)namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)flinkStateDescriptor)).value() == null;
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Error reading state.", e);
                    }
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        public void clear() {
            try {
                ((ValueState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).clear();
            }
            catch (Exception e) {
                throw new RuntimeException("Error clearing state.", e);
            }
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FlinkCombiningStateWithContext that = (FlinkCombiningStateWithContext)o;
            return this.namespace.equals(that.namespace) && this.stateId.equals(that.stateId);
        }

        public int hashCode() {
            int result = this.namespace.hashCode();
            result = 31 * result + this.stateId.hashCode();
            return result;
        }
    }

    private static class FlinkCombiningState<K, InputT, AccumT, OutputT>
    implements CombiningState<InputT, AccumT, OutputT> {
        private final StateNamespace namespace;
        private final String stateId;
        private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
        private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
        private final KeyedStateBackend<ByteBuffer> flinkStateBackend;

        FlinkCombiningState(KeyedStateBackend<ByteBuffer> flinkStateBackend, String stateId, Combine.CombineFn<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, Coder<AccumT> accumCoder) {
            this.namespace = namespace;
            this.stateId = stateId;
            this.combineFn = combineFn;
            this.flinkStateBackend = flinkStateBackend;
            this.flinkStateDescriptor = new ValueStateDescriptor(stateId, new CoderTypeSerializer<AccumT>(accumCoder));
        }

        public CombiningState<InputT, AccumT, OutputT> readLater() {
            return this;
        }

        public void add(InputT value) {
            try {
                ValueState state = (ValueState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor);
                Object current = state.value();
                if (current == null) {
                    current = this.combineFn.createAccumulator();
                }
                current = this.combineFn.addInput(current, value);
                state.update(current);
            }
            catch (Exception e) {
                throw new RuntimeException("Error adding to state.", e);
            }
        }

        public void addAccum(AccumT accum) {
            try {
                ValueState state = (ValueState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor);
                Object current = state.value();
                if (current == null) {
                    state.update(accum);
                } else {
                    current = this.combineFn.mergeAccumulators((Iterable)Lists.newArrayList((Object[])new Object[]{current, accum}));
                    state.update(current);
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Error adding to state.", e);
            }
        }

        public AccumT getAccum() {
            try {
                Object accum = ((ValueState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).value();
                return (AccumT)(accum != null ? accum : this.combineFn.createAccumulator());
            }
            catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
            return (AccumT)this.combineFn.mergeAccumulators(accumulators);
        }

        public OutputT read() {
            try {
                ValueState state = (ValueState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor);
                Object accum = state.value();
                if (accum != null) {
                    return (OutputT)this.combineFn.extractOutput(accum);
                }
                return (OutputT)this.combineFn.extractOutput(this.combineFn.createAccumulator());
            }
            catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public Boolean read() {
                    try {
                        return ((ValueState)flinkStateBackend.getPartitionedState((Object)namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)flinkStateDescriptor)).value() == null;
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Error reading state.", e);
                    }
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        public void clear() {
            try {
                ((ValueState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).clear();
            }
            catch (Exception e) {
                throw new RuntimeException("Error clearing state.", e);
            }
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FlinkCombiningState that = (FlinkCombiningState)o;
            return this.namespace.equals(that.namespace) && this.stateId.equals(that.stateId);
        }

        public int hashCode() {
            int result = this.namespace.hashCode();
            result = 31 * result + this.stateId.hashCode();
            return result;
        }
    }

    private static class FlinkBagState<K, T>
    implements BagState<T> {
        private final StateNamespace namespace;
        private final String stateId;
        private final ListStateDescriptor<T> flinkStateDescriptor;
        private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
        private final boolean storesVoidValues;

        FlinkBagState(KeyedStateBackend<ByteBuffer> flinkStateBackend, String stateId, StateNamespace namespace, Coder<T> coder) {
            this.namespace = namespace;
            this.stateId = stateId;
            this.flinkStateBackend = flinkStateBackend;
            this.storesVoidValues = coder instanceof VoidCoder;
            this.flinkStateDescriptor = new ListStateDescriptor(stateId, new CoderTypeSerializer<T>(coder));
        }

        public void add(T input) {
            try {
                ListState partitionedState = (ListState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor);
                if (this.storesVoidValues) {
                    Preconditions.checkState((input == null ? 1 : 0) != 0, (String)"Expected to a null value but was: %s", input);
                    input = VoidCoder.of().structuralValue((Void)input);
                }
                partitionedState.add(input);
            }
            catch (Exception e) {
                throw new RuntimeException("Error adding to bag state.", e);
            }
        }

        public BagState<T> readLater() {
            return this;
        }

        @Nonnull
        public Iterable<T> read() {
            try {
                ListState partitionedState = (ListState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor);
                Iterable result = (Iterable)partitionedState.get();
                if (this.storesVoidValues) {
                    return () -> {
                        final Iterator underlying = result.iterator();
                        return new Iterator<T>(){

                            @Override
                            public boolean hasNext() {
                                return underlying.hasNext();
                            }

                            @Override
                            public T next() {
                                underlying.next();
                                return null;
                            }
                        };
                    };
                }
                return result != null ? ImmutableList.copyOf((Iterable)result) : Collections.emptyList();
            }
            catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public Boolean read() {
                    try {
                        Iterable result = (Iterable)((ListState)flinkStateBackend.getPartitionedState((Object)namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)flinkStateDescriptor)).get();
                        return result == null;
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Error reading state.", e);
                    }
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        public void clear() {
            try {
                ((ListState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).clear();
            }
            catch (Exception e) {
                throw new RuntimeException("Error clearing state.", e);
            }
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FlinkBagState that = (FlinkBagState)o;
            return this.namespace.equals(that.namespace) && this.stateId.equals(that.stateId);
        }

        public int hashCode() {
            int result = this.namespace.hashCode();
            result = 31 * result + this.stateId.hashCode();
            return result;
        }
    }

    private static class FlinkValueState<T>
    implements org.apache.beam.sdk.state.ValueState<T> {
        private final StateNamespace namespace;
        private final String stateId;
        private final ValueStateDescriptor<T> flinkStateDescriptor;
        private final KeyedStateBackend<ByteBuffer> flinkStateBackend;

        FlinkValueState(KeyedStateBackend<ByteBuffer> flinkStateBackend, String stateId, StateNamespace namespace, Coder<T> coder) {
            this.namespace = namespace;
            this.stateId = stateId;
            this.flinkStateBackend = flinkStateBackend;
            this.flinkStateDescriptor = new ValueStateDescriptor(stateId, new CoderTypeSerializer<T>(coder));
        }

        public void write(T input) {
            try {
                ((ValueState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).update(input);
            }
            catch (Exception e) {
                throw new RuntimeException("Error updating state.", e);
            }
        }

        public org.apache.beam.sdk.state.ValueState<T> readLater() {
            return this;
        }

        public T read() {
            try {
                return (T)((ValueState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).value();
            }
            catch (Exception e) {
                throw new RuntimeException("Error reading state.", e);
            }
        }

        public void clear() {
            try {
                ((ValueState)this.flinkStateBackend.getPartitionedState((Object)this.namespace.stringKey(), (TypeSerializer)StringSerializer.INSTANCE, this.flinkStateDescriptor)).clear();
            }
            catch (Exception e) {
                throw new RuntimeException("Error clearing state.", e);
            }
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FlinkValueState that = (FlinkValueState)o;
            return this.namespace.equals(that.namespace) && this.stateId.equals(that.stateId);
        }

        public int hashCode() {
            int result = this.namespace.hashCode();
            result = 31 * result + this.stateId.hashCode();
            return result;
        }
    }

    private class FlinkStateBinder
    implements StateBinder {
        private final StateNamespace namespace;
        private final StateContext<?> stateContext;

        private FlinkStateBinder(StateNamespace namespace, StateContext<?> stateContext) {
            this.namespace = namespace;
            this.stateContext = stateContext;
        }

        public <T2> org.apache.beam.sdk.state.ValueState<T2> bindValue(String id, StateSpec<org.apache.beam.sdk.state.ValueState<T2>> spec, Coder<T2> coder) {
            return new FlinkValueState<T2>((KeyedStateBackend<ByteBuffer>)FlinkStateInternals.this.flinkStateBackend, id, this.namespace, coder);
        }

        public <T2> BagState<T2> bindBag(String id, StateSpec<BagState<T2>> spec, Coder<T2> elemCoder) {
            return new FlinkBagState((KeyedStateBackend<ByteBuffer>)FlinkStateInternals.this.flinkStateBackend, id, this.namespace, elemCoder);
        }

        public <T2> SetState<T2> bindSet(String id, StateSpec<SetState<T2>> spec, Coder<T2> elemCoder) {
            return new FlinkSetState<T2>((KeyedStateBackend<ByteBuffer>)FlinkStateInternals.this.flinkStateBackend, id, this.namespace, elemCoder);
        }

        public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(String id, StateSpec<MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
            return new FlinkMapState<KeyT, ValueT>((KeyedStateBackend<ByteBuffer>)FlinkStateInternals.this.flinkStateBackend, id, this.namespace, mapKeyCoder, mapValueCoder);
        }

        public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining(String id, StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
            return new FlinkCombiningState((KeyedStateBackend<ByteBuffer>)FlinkStateInternals.this.flinkStateBackend, id, combineFn, this.namespace, accumCoder);
        }

        public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(String id, StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
            return new FlinkCombiningStateWithContext((KeyedStateBackend<ByteBuffer>)FlinkStateInternals.this.flinkStateBackend, id, combineFn, this.namespace, accumCoder, CombineContextFactory.createFromStateContext(this.stateContext));
        }

        public WatermarkHoldState bindWatermark(String id, StateSpec<WatermarkHoldState> spec, TimestampCombiner timestampCombiner) {
            return new FlinkWatermarkHoldState((KeyedStateBackend<ByteBuffer>)FlinkStateInternals.this.flinkStateBackend, (MapStateDescriptor<String, Instant>)FlinkStateInternals.this.watermarkHoldStateDescriptor, id, this.namespace, timestampCombiner);
        }
    }
}

