package org.apache.flink.runtime.state.ttl.mock;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StateSnapshotTransformers;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.FlinkRuntimeException;

/* loaded from: input_file:org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.class */
public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES = (Map) Stream.of((Object[]) new Tuple2[]{Tuple2.of(ValueStateDescriptor.class, MockInternalValueState::createState), Tuple2.of(ListStateDescriptor.class, MockInternalListState::createState), Tuple2.of(MapStateDescriptor.class, MockInternalMapState::createState), Tuple2.of(ReducingStateDescriptor.class, MockInternalReducingState::createState), Tuple2.of(AggregatingStateDescriptor.class, MockInternalAggregatingState::createState), Tuple2.of(FoldingStateDescriptor.class, MockInternalFoldingState::createState)}).collect(Collectors.toMap(tuple2 -> {
        return (Class) tuple2.f0;
    }, tuple22 -> {
        return (StateFactory) tuple22.f1;
    }));
    private final Map<String, Map<K, Map<Object, Object>>> stateValues;
    private final Map<String, StateSnapshotTransformer<Object>> stateSnapshotFilters;

    /* loaded from: input_file:org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend$MockKeyedStateHandle.class */
    static class MockKeyedStateHandle<K> implements KeyedStateHandle {
        private static final long serialVersionUID = 1;
        final Map<String, Map<K, Map<Object, Object>>> snapshotStates;

        MockKeyedStateHandle(Map<String, Map<K, Map<Object, Object>>> map) {
            this.snapshotStates = map;
        }

        public void discardState() {
            this.snapshotStates.clear();
        }

        public long getStateSize() {
            throw new UnsupportedOperationException();
        }

        public void registerSharedStates(SharedStateRegistry sharedStateRegistry) {
        }

        public KeyGroupRange getKeyGroupRange() {
            throw new UnsupportedOperationException();
        }

        public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
            throw new UnsupportedOperationException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend$StateFactory.class */
    public interface StateFactory {
        /* JADX WARN: Incorrect return type in method signature: <N:Ljava/lang/Object;SV:Ljava/lang/Object;S::Lorg/apache/flink/api/common/state/State;IS:TS;>(Lorg/apache/flink/api/common/typeutils/TypeSerializer<TN;>;Lorg/apache/flink/api/common/state/StateDescriptor<TS;TSV;>;)TIS; */
        State createInternalState(TypeSerializer typeSerializer, StateDescriptor stateDescriptor) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MockKeyedStateBackend(TaskKvStateRegistry taskKvStateRegistry, TypeSerializer<K> typeSerializer, ClassLoader classLoader, int i, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, Map<String, Map<K, Map<Object, Object>>> map, Map<String, StateSnapshotTransformer<Object>> map2, CloseableRegistry closeableRegistry) {
        super(taskKvStateRegistry, typeSerializer, classLoader, i, keyGroupRange, executionConfig, ttlTimeProvider, closeableRegistry);
        this.stateValues = map;
        this.stateSnapshotFilters = map2;
    }

    /* JADX WARN: Incorrect return type in method signature: <N:Ljava/lang/Object;SV:Ljava/lang/Object;SEV:Ljava/lang/Object;S::Lorg/apache/flink/api/common/state/State;IS:TS;>(Lorg/apache/flink/api/common/typeutils/TypeSerializer<TN;>;Lorg/apache/flink/api/common/state/StateDescriptor<TS;TSV;>;Lorg/apache/flink/runtime/state/StateSnapshotTransformer$StateSnapshotTransformFactory<TSEV;>;)TIS; */
    @Nonnull
    public State createInternalState(@Nonnull TypeSerializer typeSerializer, @Nonnull StateDescriptor stateDescriptor, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory stateSnapshotTransformFactory) throws Exception {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDescriptor.getClass());
        if (stateFactory == null) {
            throw new FlinkRuntimeException(String.format("State %s is not supported by %s", stateDescriptor.getClass(), TtlStateFactory.class));
        }
        MockInternalKvState createInternalState = stateFactory.createInternalState(typeSerializer, stateDescriptor);
        this.stateSnapshotFilters.put(stateDescriptor.getName(), getStateSnapshotTransformer(stateDescriptor, stateSnapshotTransformFactory));
        createInternalState.values = () -> {
            return (Map) this.stateValues.computeIfAbsent(stateDescriptor.getName(), str -> {
                return new HashMap();
            }).computeIfAbsent(getCurrentKey(), obj -> {
                return new HashMap();
            });
        };
        return createInternalState;
    }

    private <SV, SEV> StateSnapshotTransformer<SV> getStateSnapshotTransformer(StateDescriptor<?, SV> stateDescriptor, StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> stateSnapshotTransformFactory) {
        Optional createForDeserializedState = stateSnapshotTransformFactory.createForDeserializedState();
        if (createForDeserializedState.isPresent()) {
            return stateDescriptor instanceof ListStateDescriptor ? new StateSnapshotTransformers.ListStateSnapshotTransformer((StateSnapshotTransformer) createForDeserializedState.get()) : stateDescriptor instanceof MapStateDescriptor ? new StateSnapshotTransformers.MapStateSnapshotTransformer((StateSnapshotTransformer) createForDeserializedState.get()) : (StateSnapshotTransformer) createForDeserializedState.get();
        }
        return null;
    }

    public int numKeyValueStateEntries() {
        int i = 0;
        for (String str : this.stateValues.keySet()) {
            Iterator<K> it = this.stateValues.get(str).keySet().iterator();
            while (it.hasNext()) {
                i += this.stateValues.get(str).get(it.next()).size();
            }
        }
        return i;
    }

    public boolean requiresLegacySynchronousTimerSnapshots() {
        return false;
    }

    public void notifyCheckpointComplete(long j) {
    }

    public <N> Stream<K> getKeys(String str, N n) {
        return (Stream<K>) this.stateValues.get(str).entrySet().stream().filter(entry -> {
            return ((Map) entry.getValue()).containsKey(n);
        }).map((v0) -> {
            return v0.getKey();
        });
    }

    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        return new FutureTask(() -> {
            return SnapshotResult.of(new MockKeyedStateHandle(copy(this.stateValues, this.stateSnapshotFilters)));
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K> Map<String, Map<K, Map<Object, Object>>> copy(Map<String, Map<K, Map<Object, Object>>> map, Map<String, StateSnapshotTransformer<Object>> map2) {
        HashMap hashMap = new HashMap();
        for (String str : map.keySet()) {
            StateSnapshotTransformer<Object> orDefault = map2.getOrDefault(str, null);
            Map<K, Map<Object, Object>> computeIfAbsent = hashMap.computeIfAbsent(str, str2 -> {
                return new HashMap();
            });
            for (K k : map.get(str).keySet()) {
                Map<Object, Object> computeIfAbsent2 = computeIfAbsent.computeIfAbsent(k, obj -> {
                    return new HashMap();
                });
                Iterator<Object> it = map.get(str).get(k).keySet().iterator();
                while (it.hasNext()) {
                    copyEntry(map, computeIfAbsent2, str, k, it.next(), orDefault);
                }
            }
        }
        return hashMap;
    }

    private static <K> void copyEntry(Map<String, Map<K, Map<Object, Object>>> map, Map<Object, Object> map2, String str, K k, Object obj, StateSnapshotTransformer<Object> stateSnapshotTransformer) {
        Object obj2 = map.get(str).get(k).get(obj);
        Object arrayList = obj2 instanceof List ? new ArrayList((List) obj2) : obj2;
        Object hashMap = arrayList instanceof Map ? new HashMap((Map) arrayList) : arrayList;
        Object filterOrTransform = stateSnapshotTransformer == null ? hashMap : stateSnapshotTransformer.filterOrTransform(hashMap);
        if (filterOrTransform != null) {
            map2.put(obj, filterOrTransform);
        }
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String str, @Nonnull TypeSerializer<T> typeSerializer) {
        return new HeapPriorityQueueSet(PriorityComparator.forPriorityComparableObjects(), KeyExtractorFunction.forKeyedObjects(), 0, this.keyGroupRange, 0);
    }
}
