/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;

public class GenericFoldingState<K, N, T, ACC, Backend extends AbstractStateBackend, W extends ValueState<ACC> & KvState<K, N, ValueState<ACC>, ValueStateDescriptor<ACC>, Backend>>
implements FoldingState<T, ACC>,
KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> {
    private final W wrappedState;
    private final FoldFunction<T, ACC> foldFunction;

    public GenericFoldingState(ValueState<ACC> wrappedState, FoldFunction<T, ACC> foldFunction) {
        if (!(wrappedState instanceof KvState)) {
            throw new IllegalArgumentException("Wrapped state must be a KvState.");
        }
        this.wrappedState = wrappedState;
        this.foldFunction = foldFunction;
    }

    @Override
    public void setCurrentKey(K key) {
        ((KvState)this.wrappedState).setCurrentKey(key);
    }

    @Override
    public void setCurrentNamespace(N namespace) {
        ((KvState)this.wrappedState).setCurrentNamespace(namespace);
    }

    @Override
    public KvStateSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> snapshot(long checkpointId, long timestamp) throws Exception {
        KvStateSnapshot wrappedSnapshot = ((KvState)this.wrappedState).snapshot(checkpointId, timestamp);
        return new Snapshot(wrappedSnapshot, this.foldFunction);
    }

    @Override
    public void dispose() {
        ((KvState)this.wrappedState).dispose();
    }

    public ACC get() throws Exception {
        return (ACC)this.wrappedState.value();
    }

    public void add(T value) throws Exception {
        Object currentValue = this.wrappedState.value();
        this.wrappedState.update(this.foldFunction.fold(currentValue, value));
    }

    public void clear() {
        this.wrappedState.clear();
    }

    private static class Snapshot<K, N, T, ACC, Backend extends AbstractStateBackend>
    implements KvStateSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> {
        private static final long serialVersionUID = 1L;
        private final KvStateSnapshot<K, N, ValueState<ACC>, ValueStateDescriptor<ACC>, Backend> wrappedSnapshot;
        private final FoldFunction<T, ACC> foldFunction;

        public Snapshot(KvStateSnapshot<K, N, ValueState<ACC>, ValueStateDescriptor<ACC>, Backend> wrappedSnapshot, FoldFunction<T, ACC> foldFunction) {
            this.wrappedSnapshot = wrappedSnapshot;
            this.foldFunction = foldFunction;
        }

        @Override
        public KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> restoreState(Backend stateBackend, TypeSerializer<K> keySerializer, ClassLoader classLoader, long recoveryTimestamp) throws Exception {
            return new GenericFoldingState((ValueState)this.wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader, recoveryTimestamp), this.foldFunction);
        }

        @Override
        public void discardState() throws Exception {
            this.wrappedSnapshot.discardState();
        }

        @Override
        public long getStateSize() throws Exception {
            return this.wrappedSnapshot.getStateSize();
        }
    }
}

