package org.apache.flink.state.api.input;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.RunnableFuture;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.ttl.mock.MockRestoreOperation;
import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/state/api/input/MultiStateKeyIteratorTest.class */
public class MultiStateKeyIteratorTest {
    private static final List<ValueStateDescriptor<Integer>> descriptors = new ArrayList(2);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/state/api/input/MultiStateKeyIteratorTest$CountingKeysKeyedStateBackend.class */
    public static class CountingKeysKeyedStateBackend extends AbstractKeyedStateBackend<Integer> {
        int numberOfKeysGenerated;
        public long numberOfKeysEnumerated;

        public CountingKeysKeyedStateBackend(int i, TaskKvStateRegistry taskKvStateRegistry, TypeSerializer<Integer> typeSerializer, ClassLoader classLoader, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, CloseableRegistry closeableRegistry, InternalKeyContext<Integer> internalKeyContext) {
            super(taskKvStateRegistry, typeSerializer, classLoader, executionConfig, ttlTimeProvider, latencyTrackingStateConfig, closeableRegistry, internalKeyContext);
            this.numberOfKeysGenerated = i;
            this.numberOfKeysEnumerated = 0L;
        }

        public <N> Stream<Integer> getKeys(String str, N n) {
            return IntStream.range(0, this.numberOfKeysGenerated).boxed().peek(num -> {
                this.numberOfKeysEnumerated++;
            });
        }

        public int numKeyValueStateEntries() {
            return this.numberOfKeysGenerated;
        }

        @Nonnull
        public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String str, @Nonnull TypeSerializer<T> typeSerializer) {
            throw new UnsupportedOperationException("Operations other than getKeys() are not supported on this testing StateBackend.");
        }

        public void notifyCheckpointComplete(long j) {
        }

        @Nonnull
        public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws UnsupportedOperationException {
            throw new UnsupportedOperationException("Operations other than getKeys() are not supported on this testing StateBackend.");
        }

        /* JADX WARN: Incorrect return type in method signature: <N:Ljava/lang/Object;SV:Ljava/lang/Object;SEV:Ljava/lang/Object;S::Lorg/apache/flink/api/common/state/State;IS:TS;>(Lorg/apache/flink/api/common/typeutils/TypeSerializer<TN;>;Lorg/apache/flink/api/common/state/StateDescriptor<TS;TSV;>;Lorg/apache/flink/runtime/state/StateSnapshotTransformer$StateSnapshotTransformFactory<TSEV;>;)TIS; */
        @Nonnull
        public State createOrUpdateInternalState(@Nonnull TypeSerializer typeSerializer, @Nonnull StateDescriptor stateDescriptor, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory stateSnapshotTransformFactory) throws UnsupportedOperationException {
            throw new UnsupportedOperationException("Operations other than getKeys() are not supported on this testing StateBackend.");
        }

        public <N> Stream<Tuple2<Integer, N>> getKeysAndNamespaces(String str) {
            throw new UnsupportedOperationException("Operations other than getKeys() are not supported on this testing StateBackend.");
        }

        @Nonnull
        public SavepointResources<Integer> savepoint() throws UnsupportedOperationException {
            throw new UnsupportedOperationException("Operations other than getKeys() are not supported on this testing StateBackend.");
        }
    }

    private static AbstractKeyedStateBackend<Integer> createKeyedStateBackend() {
        return new MockStateBackend().createKeyedStateBackend(new KeyedStateBackendParametersImpl(new DummyEnvironment(), new JobID(), "mock-backend", IntSerializer.INSTANCE, 129, KeyGroupRange.of(0, 128), (TaskKvStateRegistry) null, TtlTimeProvider.DEFAULT, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), Collections.emptyList(), new CloseableRegistry()));
    }

    private static CountingKeysKeyedStateBackend createCountingKeysKeyedStateBackend(Integer num) {
        DummyEnvironment dummyEnvironment = new DummyEnvironment();
        IntSerializer intSerializer = IntSerializer.INSTANCE;
        KeyGroupRange of = KeyGroupRange.of(0, 128);
        TtlTimeProvider ttlTimeProvider = TtlTimeProvider.DEFAULT;
        List emptyList = Collections.emptyList();
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        new MockRestoreOperation(emptyList, new HashMap()).restore();
        return new CountingKeysKeyedStateBackend(num.intValue(), null, StateSerializerProvider.fromNewRegisteredSerializer(intSerializer).currentSchemaSerializer(), dummyEnvironment.getUserCodeClassLoader().asClassLoader(), dummyEnvironment.getExecutionConfig(), ttlTimeProvider, LatencyTrackingStateConfig.disabled(), closeableRegistry, new InternalKeyContextImpl(of, 129));
    }

    private static void setKey(AbstractKeyedStateBackend<Integer> abstractKeyedStateBackend, ValueStateDescriptor<Integer> valueStateDescriptor, int i) throws Exception {
        abstractKeyedStateBackend.setCurrentKey(Integer.valueOf(i));
        abstractKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor).update(0);
    }

    private static void clearKey(AbstractKeyedStateBackend<Integer> abstractKeyedStateBackend, ValueStateDescriptor<Integer> valueStateDescriptor, int i) throws Exception {
        abstractKeyedStateBackend.setCurrentKey(Integer.valueOf(i));
        abstractKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor).clear();
    }

    @Test
    public void testIteratorPullsKeyFromAllDescriptors() throws Exception {
        AbstractKeyedStateBackend<Integer> createKeyedStateBackend = createKeyedStateBackend();
        setKey(createKeyedStateBackend, descriptors.get(0), 1);
        setKey(createKeyedStateBackend, descriptors.get(1), 2);
        MultiStateKeyIterator multiStateKeyIterator = new MultiStateKeyIterator(descriptors, createKeyedStateBackend);
        ArrayList arrayList = new ArrayList();
        while (multiStateKeyIterator.hasNext()) {
            arrayList.add(multiStateKeyIterator.next());
        }
        Assert.assertEquals("Unexpected number of keys", 2L, arrayList.size());
        Assert.assertEquals("Unexpected keys found", Arrays.asList(1, 2), arrayList);
    }

    @Test
    public void testIteratorSkipsEmptyDescriptors() throws Exception {
        AbstractKeyedStateBackend<Integer> createKeyedStateBackend = createKeyedStateBackend();
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(new ValueStateDescriptor("state-1", Types.INT));
        arrayList.add(new ValueStateDescriptor("state-2", Types.INT));
        arrayList.add(new ValueStateDescriptor("state-3", Types.INT));
        setKey(createKeyedStateBackend, (ValueStateDescriptor) arrayList.get(0), 1);
        setKey(createKeyedStateBackend, (ValueStateDescriptor) arrayList.get(1), 12);
        clearKey(createKeyedStateBackend, (ValueStateDescriptor) arrayList.get(1), 12);
        setKey(createKeyedStateBackend, (ValueStateDescriptor) arrayList.get(2), 2);
        MultiStateKeyIterator multiStateKeyIterator = new MultiStateKeyIterator(arrayList, createKeyedStateBackend);
        ArrayList arrayList2 = new ArrayList();
        while (multiStateKeyIterator.hasNext()) {
            arrayList2.add(multiStateKeyIterator.next());
        }
        Assert.assertEquals("Unexpected number of keys", 2L, arrayList2.size());
        Assert.assertEquals("Unexpected keys found", Arrays.asList(1, 2), arrayList2);
    }

    @Test
    public void testIteratorRemovesFromAllDescriptors() throws Exception {
        AbstractKeyedStateBackend<Integer> createKeyedStateBackend = createKeyedStateBackend();
        setKey(createKeyedStateBackend, descriptors.get(0), 1);
        setKey(createKeyedStateBackend, descriptors.get(1), 1);
        MultiStateKeyIterator multiStateKeyIterator = new MultiStateKeyIterator(descriptors, createKeyedStateBackend);
        Assert.assertEquals("Unexpected keys pulled from state backend", 1L, ((Integer) multiStateKeyIterator.next()).intValue());
        multiStateKeyIterator.remove();
        Assert.assertFalse("Failed to drop key from all descriptors in state backend", multiStateKeyIterator.hasNext());
        Iterator<ValueStateDescriptor<Integer>> it = descriptors.iterator();
        while (it.hasNext()) {
            Assert.assertEquals("Failed to drop key for state descriptor", 0L, createKeyedStateBackend.getKeys(it.next().getName(), VoidNamespace.INSTANCE).count());
        }
    }

    @Test
    public void testIteratorPullsSingleKeyFromAllDescriptors() throws AssertionError {
        CountingKeysKeyedStateBackend createCountingKeysKeyedStateBackend = createCountingKeysKeyedStateBackend(100000000);
        new MultiStateKeyIterator(descriptors, createCountingKeysKeyedStateBackend).hasNext();
        Assert.assertEquals("Unexpected number of keys enumerated", 1L, createCountingKeysKeyedStateBackend.numberOfKeysEnumerated);
    }

    static {
        descriptors.add(new ValueStateDescriptor<>("state-1", Types.INT));
        descriptors.add(new ValueStateDescriptor<>("state-2", Types.INT));
    }
}
