/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.changelog;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.TestableKeyedStateBackend;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateFactory;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.state.changelog.ChangelogAggregatingState;
import org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue;
import org.apache.flink.state.changelog.ChangelogListState;
import org.apache.flink.state.changelog.ChangelogMapState;
import org.apache.flink.state.changelog.ChangelogReducingState;
import org.apache.flink.state.changelog.ChangelogValueState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

@Internal
class ChangelogKeyedStateBackend<K>
implements CheckpointableKeyedStateBackend<K>,
CheckpointListener,
TestableKeyedStateBackend {
    private static final Map<StateDescriptor.Type, StateFactory> STATE_FACTORIES = Stream.of(Tuple2.of((Object)StateDescriptor.Type.VALUE, ChangelogValueState::create), Tuple2.of((Object)StateDescriptor.Type.LIST, ChangelogListState::create), Tuple2.of((Object)StateDescriptor.Type.REDUCING, ChangelogReducingState::create), Tuple2.of((Object)StateDescriptor.Type.AGGREGATING, ChangelogAggregatingState::create), Tuple2.of((Object)StateDescriptor.Type.MAP, ChangelogMapState::create)).collect(Collectors.toMap(t -> (StateDescriptor.Type)t.f0, t -> (StateFactory)t.f1));
    private final AbstractKeyedStateBackend<K> keyedStateBackend;
    private final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
    private final ExecutionConfig executionConfig;
    private final TtlTimeProvider ttlTimeProvider;
    private InternalKvState lastState;
    private String lastName;

    public ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider) {
        this.keyedStateBackend = keyedStateBackend;
        this.executionConfig = executionConfig;
        this.ttlTimeProvider = ttlTimeProvider;
        this.keyValueStatesByName = new HashMap();
    }

    public KeyGroupRange getKeyGroupRange() {
        return this.keyedStateBackend.getKeyGroupRange();
    }

    public void close() throws IOException {
        this.keyedStateBackend.close();
    }

    public void setCurrentKey(K newKey) {
        this.keyedStateBackend.setCurrentKey(newKey);
    }

    public K getCurrentKey() {
        return (K)this.keyedStateBackend.getCurrentKey();
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.keyedStateBackend.getKeySerializer();
    }

    public <N> Stream<K> getKeys(String state, N namespace) {
        return this.keyedStateBackend.getKeys(state, namespace);
    }

    public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
        return this.keyedStateBackend.getKeysAndNamespaces(state);
    }

    public void dispose() {
        this.keyedStateBackend.dispose();
        this.lastName = null;
        this.lastState = null;
        this.keyValueStatesByName.clear();
    }

    public void registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener) {
        this.keyedStateBackend.registerKeySelectionListener(listener);
    }

    public boolean deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener) {
        return this.keyedStateBackend.deregisterKeySelectionListener(listener);
    }

    public <N, S extends State, T> void applyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> function) throws Exception {
        this.keyedStateBackend.applyToAllKeys(namespace, namespaceSerializer, stateDescriptor, function, this::getPartitionedState);
    }

    public <N, S extends State> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(namespace, (String)"Namespace");
        if (this.lastName != null && this.lastName.equals(stateDescriptor.getName())) {
            this.lastState.setCurrentNamespace(namespace);
            return (S)this.lastState;
        }
        InternalKvState<K, ?, ?> previous = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (previous != null) {
            this.lastState = previous;
            this.lastState.setCurrentNamespace(namespace);
            this.lastName = stateDescriptor.getName();
            return (S)previous;
        }
        S state = this.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
        InternalKvState kvState = (InternalKvState)state;
        this.lastName = stateDescriptor.getName();
        this.lastState = kvState;
        kvState.setCurrentNamespace(namespace);
        return state;
    }

    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        return this.keyedStateBackend.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        return new ChangelogKeyGroupedPriorityQueue(this.keyedStateBackend.create(stateName, byteOrderedElementSerializer));
    }

    @VisibleForTesting
    public int numKeyValueStateEntries() {
        return this.keyedStateBackend.numKeyValueStateEntries();
    }

    public boolean isStateImmutableInStateBackend(CheckpointType checkpointOptions) {
        return this.keyedStateBackend.isStateImmutableInStateBackend(checkpointOptions);
    }

    @Nonnull
    public SavepointResources<K> savepoint() throws Exception {
        return this.keyedStateBackend.savepoint();
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.keyedStateBackend.notifyCheckpointComplete(checkpointId);
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        this.keyedStateBackend.notifyCheckpointAborted(checkpointId);
    }

    public <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(namespaceSerializer, (String)"Namespace serializer");
        Preconditions.checkNotNull(this.getKeySerializer(), (String)"State key serializer has not been configured in the config. This operation cannot use partitioned state.");
        InternalKvState kvState = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (kvState == null) {
            if (!stateDescriptor.isSerializerInitialized()) {
                stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            }
            kvState = LatencyTrackingStateFactory.createStateAndWrapWithLatencyTrackingIfEnabled((InternalKvState)((InternalKvState)TtlStateFactory.createStateAndWrapWithTtlIfEnabled(namespaceSerializer, stateDescriptor, (KeyedStateBackend)this, (TtlTimeProvider)this.ttlTimeProvider)), stateDescriptor, (LatencyTrackingStateConfig)this.keyedStateBackend.getLatencyTrackingStateConfig());
            this.keyValueStatesByName.put(stateDescriptor.getName(), kvState);
            this.keyedStateBackend.publishQueryableStateIfEnabled(stateDescriptor, kvState);
        }
        return (S)kvState;
    }

    @Nonnull
    public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getType());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s", stateDesc.getClass(), this.getClass());
            throw new FlinkRuntimeException(message);
        }
        return stateFactory.create((InternalKvState)this.keyedStateBackend.createInternalState(namespaceSerializer, stateDesc, snapshotTransformFactory));
    }

    private static interface StateFactory {
        public <K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> var1) throws Exception;
    }
}

