package org.apache.flink.state.api.input;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/state/api/input/MultiStateKeyIteratorTest.class */
public class MultiStateKeyIteratorTest {
    private static final List<ValueStateDescriptor<Integer>> descriptors = new ArrayList(2);

    private static AbstractKeyedStateBackend<Integer> createKeyedStateBackend() {
        return new MockStateBackend().createKeyedStateBackend(new DummyEnvironment(), new JobID(), "mock-backend", IntSerializer.INSTANCE, 129, KeyGroupRange.of(0, 128), (TaskKvStateRegistry) null, TtlTimeProvider.DEFAULT, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), Collections.emptyList(), new CloseableRegistry());
    }

    private static void setKey(AbstractKeyedStateBackend<Integer> abstractKeyedStateBackend, ValueStateDescriptor<Integer> valueStateDescriptor, int i) throws Exception {
        abstractKeyedStateBackend.setCurrentKey(Integer.valueOf(i));
        abstractKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor).update(0);
    }

    @Test
    public void testIteratorPullsKeyFromAllDescriptors() throws Exception {
        AbstractKeyedStateBackend<Integer> createKeyedStateBackend = createKeyedStateBackend();
        setKey(createKeyedStateBackend, descriptors.get(0), 1);
        setKey(createKeyedStateBackend, descriptors.get(1), 2);
        MultiStateKeyIterator multiStateKeyIterator = new MultiStateKeyIterator(descriptors, createKeyedStateBackend);
        ArrayList arrayList = new ArrayList();
        while (multiStateKeyIterator.hasNext()) {
            arrayList.add(multiStateKeyIterator.next());
        }
        Assert.assertEquals("Unexpected number of keys", 2L, arrayList.size());
        Assert.assertEquals("Unexpected keys found", Arrays.asList(1, 2), arrayList);
    }

    @Test
    public void testIteratorRemovesFromAllDescriptors() throws Exception {
        AbstractKeyedStateBackend<Integer> createKeyedStateBackend = createKeyedStateBackend();
        setKey(createKeyedStateBackend, descriptors.get(0), 1);
        setKey(createKeyedStateBackend, descriptors.get(1), 1);
        MultiStateKeyIterator multiStateKeyIterator = new MultiStateKeyIterator(descriptors, createKeyedStateBackend);
        Assert.assertEquals("Unexpected keys pulled from state backend", 1L, ((Integer) multiStateKeyIterator.next()).intValue());
        multiStateKeyIterator.remove();
        Assert.assertFalse("Failed to drop key from all descriptors in state backend", multiStateKeyIterator.hasNext());
        Iterator<ValueStateDescriptor<Integer>> it = descriptors.iterator();
        while (it.hasNext()) {
            Assert.assertEquals("Failed to drop key for state descriptor", 0L, createKeyedStateBackend.getKeys(it.next().getName(), VoidNamespace.INSTANCE).count());
        }
    }

    static {
        descriptors.add(new ValueStateDescriptor<>("state-1", Types.INT));
        descriptors.add(new ValueStateDescriptor<>("state-2", Types.INT));
    }
}
