/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.changelog;

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.state.changelog.ChangelogAggregatingState;
import org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue;
import org.apache.flink.state.changelog.ChangelogKeyedStateBackend;
import org.apache.flink.state.changelog.ChangelogListState;
import org.apache.flink.state.changelog.ChangelogMapState;
import org.apache.flink.state.changelog.ChangelogReducingState;
import org.apache.flink.state.changelog.ChangelogState;
import org.apache.flink.state.changelog.ChangelogValueState;
import org.apache.flink.state.changelog.KvStateChangeLogger;
import org.apache.flink.state.changelog.StateChangeLogger;
import org.apache.flink.util.FlinkRuntimeException;

public class ChangelogStateFactory {
    private final Map<String, ChangelogState> changelogStates = new HashMap<String, ChangelogState>();
    private final Map<String, ChangelogKeyGroupedPriorityQueue<?>> priorityQueueStatesByName = new HashMap();
    private static final Map<StateDescriptor.Type, StateFactory> STATE_FACTORIES = Stream.of(Tuple2.of((Object)StateDescriptor.Type.VALUE, ChangelogValueState::create), Tuple2.of((Object)StateDescriptor.Type.LIST, ChangelogListState::create), Tuple2.of((Object)StateDescriptor.Type.REDUCING, ChangelogReducingState::create), Tuple2.of((Object)StateDescriptor.Type.AGGREGATING, ChangelogAggregatingState::create), Tuple2.of((Object)StateDescriptor.Type.MAP, ChangelogMapState::create)).collect(Collectors.toMap(t -> (StateDescriptor.Type)t.f0, t -> (StateFactory)t.f1));

    public <K, N, V, S extends State> ChangelogState create(StateDescriptor<S, V> stateDescriptor, InternalKvState<K, N, V> internalKvState, KvStateChangeLogger<V, N> kvStateChangeLogger, InternalKeyContext<K> keyContext) throws Exception {
        ChangelogState changelogState = (ChangelogState)this.getStateFactory(stateDescriptor).create(internalKvState, kvStateChangeLogger, keyContext);
        this.changelogStates.put(stateDescriptor.getName(), changelogState);
        return changelogState;
    }

    public <T> ChangelogKeyGroupedPriorityQueue<T> create(String stateName, KeyGroupedInternalPriorityQueue<T> internalPriorityQueue, StateChangeLogger<T, Void> logger, TypeSerializer<T> serializer) {
        ChangelogKeyGroupedPriorityQueue<T> changelogKeyGroupedPriorityQueue = new ChangelogKeyGroupedPriorityQueue<T>(internalPriorityQueue, logger, serializer);
        this.priorityQueueStatesByName.put(stateName, changelogKeyGroupedPriorityQueue);
        return changelogKeyGroupedPriorityQueue;
    }

    public ChangelogState getExistingState(String name, StateMetaInfoSnapshot.BackendStateType type) throws UnsupportedOperationException {
        ChangelogState state;
        switch (type) {
            case KEY_VALUE: {
                state = this.changelogStates.get(name);
                break;
            }
            case PRIORITY_QUEUE: {
                state = this.priorityQueueStatesByName.get(name);
                break;
            }
            default: {
                throw new UnsupportedOperationException(String.format("Unknown state type %s (%s)", type, name));
            }
        }
        return state;
    }

    public void resetAllWritingMetaFlags() {
        for (ChangelogState changelogState : this.changelogStates.values()) {
            changelogState.resetWritingMetaFlag();
        }
        for (ChangelogKeyGroupedPriorityQueue changelogKeyGroupedPriorityQueue : this.priorityQueueStatesByName.values()) {
            changelogKeyGroupedPriorityQueue.resetWritingMetaFlag();
        }
    }

    public void dispose() {
        this.changelogStates.clear();
        this.priorityQueueStatesByName.clear();
    }

    private <S extends State, V> StateFactory getStateFactory(StateDescriptor<S, V> stateDescriptor) {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDescriptor.getType());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s", stateDescriptor.getClass(), ChangelogKeyedStateBackend.class);
            throw new FlinkRuntimeException(message);
        }
        return stateFactory;
    }

    private static interface StateFactory {
        public <K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> var1, KvStateChangeLogger<SV, N> var2, InternalKeyContext<K> var3) throws Exception;
    }
}

