package org.apache.flink.runtime.query;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/query/KvStateRegistryTest.class */
public class KvStateRegistryTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/runtime/query/KvStateRegistryTest$DeepCopyingStringSerializer.class */
    private static class DeepCopyingStringSerializer extends TypeSerializer<String> {
        private static final long serialVersionUID = -3744051158625555607L;

        private DeepCopyingStringSerializer() {
        }

        public boolean isImmutableType() {
            return false;
        }

        public TypeSerializer<String> duplicate() {
            return new DeepCopyingStringSerializer();
        }

        /* renamed from: createInstance, reason: merged with bridge method [inline-methods] */
        public String m307createInstance() {
            return null;
        }

        public String copy(String str) {
            return null;
        }

        public String copy(String str, String str2) {
            return null;
        }

        public int getLength() {
            return 0;
        }

        public void serialize(String str, DataOutputView dataOutputView) throws IOException {
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public String m306deserialize(DataInputView dataInputView) throws IOException {
            return null;
        }

        public String deserialize(String str, DataInputView dataInputView) throws IOException {
            return null;
        }

        public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
        }

        public boolean equals(Object obj) {
            return obj instanceof DeepCopyingStringSerializer;
        }

        public int hashCode() {
            return 0;
        }

        /* renamed from: snapshotConfiguration, reason: merged with bridge method [inline-methods] */
        public TypeSerializerConfigSnapshot<String> m305snapshotConfiguration() {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/query/KvStateRegistryTest$DummyKvState.class */
    private static class DummyKvState implements InternalKvState<Integer, VoidNamespace, String> {
        private DummyKvState() {
        }

        public TypeSerializer<Integer> getKeySerializer() {
            return IntSerializer.INSTANCE;
        }

        public TypeSerializer<VoidNamespace> getNamespaceSerializer() {
            return VoidNamespaceSerializer.INSTANCE;
        }

        public TypeSerializer<String> getValueSerializer() {
            return new DeepCopyingStringSerializer();
        }

        public void setCurrentNamespace(VoidNamespace voidNamespace) {
        }

        public byte[] getSerializedValue(byte[] bArr, TypeSerializer<Integer> typeSerializer, TypeSerializer<VoidNamespace> typeSerializer2, TypeSerializer<String> typeSerializer3) throws Exception {
            return bArr;
        }

        public InternalKvState.StateIncrementalVisitor<Integer, VoidNamespace, String> getStateIncrementalVisitor(int i) {
            throw new UnsupportedOperationException();
        }

        public void clear() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/query/KvStateRegistryTest$TestingKvStateRegistryListener.class */
    private static final class TestingKvStateRegistryListener implements KvStateRegistryListener {
        private final Queue<JobID> stateRegisteredNotifications;
        private final Queue<JobID> stateDeregisteredNotifications;

        private TestingKvStateRegistryListener(Queue<JobID> queue, Queue<JobID> queue2) {
            this.stateRegisteredNotifications = queue;
            this.stateDeregisteredNotifications = queue2;
        }

        public void notifyKvStateRegistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str, KvStateID kvStateID) {
            this.stateRegisteredNotifications.offer(jobID);
        }

        public void notifyKvStateUnregistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str) {
            this.stateDeregisteredNotifications.offer(jobID);
        }
    }

    @Test
    public void testKvStateEntry() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        List<KvStateInfo> synchronizedList = Collections.synchronizedList(new ArrayList());
        JobID jobID = new JobID();
        JobVertexID jobVertexID = new JobVertexID();
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        KvStateID registerKvState = kvStateRegistry.registerKvState(jobID, jobVertexID, keyGroupRange, "foobar", new DummyKvState(), getClass().getClassLoader());
        AtomicReference atomicReference = new AtomicReference();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                synchronizedList.add(kvStateRegistry.getKvState(registerKvState).getInfoForCurrentThread());
                countDownLatch.countDown();
                try {
                    countDownLatch2.await();
                } catch (InterruptedException e) {
                    atomicReference.compareAndSet(null, e);
                }
            }).start();
        }
        countDownLatch.await();
        KvStateEntry kvState = kvStateRegistry.getKvState(registerKvState);
        Assert.assertEquals(10L, synchronizedList.size());
        Assert.assertEquals(10L, kvState.getCacheSize());
        countDownLatch2.countDown();
        for (KvStateInfo kvStateInfo : synchronizedList) {
            boolean z = false;
            for (KvStateInfo kvStateInfo2 : synchronizedList) {
                if (kvStateInfo == kvStateInfo2) {
                    if (z) {
                        Assert.fail("More than one thread sharing the same serializer instance.");
                    }
                    z = true;
                } else {
                    Assert.assertEquals(kvStateInfo, kvStateInfo2);
                }
            }
        }
        kvStateRegistry.unregisterKvState(jobID, jobVertexID, keyGroupRange, "foobar", registerKvState);
        Assert.assertEquals(0L, kvState.getCacheSize());
        Throwable th = (Throwable) atomicReference.get();
        if (th != null) {
            Assert.fail(th.getMessage());
        }
    }

    @Test
    public void testKvStateRegistryListenerNotification() {
        JobID jobID = new JobID();
        JobID jobID2 = new JobID();
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        ArrayDeque arrayDeque = new ArrayDeque(2);
        ArrayDeque arrayDeque2 = new ArrayDeque(2);
        TestingKvStateRegistryListener testingKvStateRegistryListener = new TestingKvStateRegistryListener(arrayDeque, arrayDeque2);
        ArrayDeque arrayDeque3 = new ArrayDeque(2);
        ArrayDeque arrayDeque4 = new ArrayDeque(2);
        TestingKvStateRegistryListener testingKvStateRegistryListener2 = new TestingKvStateRegistryListener(arrayDeque3, arrayDeque4);
        kvStateRegistry.registerListener(jobID, testingKvStateRegistryListener);
        kvStateRegistry.registerListener(jobID2, testingKvStateRegistryListener2);
        JobVertexID jobVertexID = new JobVertexID();
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
        KvStateID registerKvState = kvStateRegistry.registerKvState(jobID, jobVertexID, keyGroupRange, "foobar", new DummyKvState(), getClass().getClassLoader());
        Assert.assertThat(arrayDeque.poll(), Matchers.equalTo(jobID));
        Assert.assertThat(Boolean.valueOf(arrayDeque3.isEmpty()), Matchers.is(true));
        JobVertexID jobVertexID2 = new JobVertexID();
        KeyGroupRange keyGroupRange2 = new KeyGroupRange(0, 1);
        KvStateID registerKvState2 = kvStateRegistry.registerKvState(jobID2, jobVertexID2, keyGroupRange2, "barfoo", new DummyKvState(), getClass().getClassLoader());
        Assert.assertThat(arrayDeque3.poll(), Matchers.equalTo(jobID2));
        Assert.assertThat(Boolean.valueOf(arrayDeque.isEmpty()), Matchers.is(true));
        kvStateRegistry.unregisterKvState(jobID, jobVertexID, keyGroupRange, "foobar", registerKvState);
        Assert.assertThat(arrayDeque2.poll(), Matchers.equalTo(jobID));
        Assert.assertThat(Boolean.valueOf(arrayDeque4.isEmpty()), Matchers.is(true));
        kvStateRegistry.unregisterKvState(jobID2, jobVertexID2, keyGroupRange2, "barfoo", registerKvState2);
        Assert.assertThat(arrayDeque4.poll(), Matchers.equalTo(jobID2));
        Assert.assertThat(Boolean.valueOf(arrayDeque2.isEmpty()), Matchers.is(true));
    }

    @Test
    public void testLegacyCodePathPreference() {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        ArrayDeque arrayDeque = new ArrayDeque(2);
        ArrayDeque arrayDeque2 = new ArrayDeque(2);
        TestingKvStateRegistryListener testingKvStateRegistryListener = new TestingKvStateRegistryListener(arrayDeque, arrayDeque2);
        ArrayDeque arrayDeque3 = new ArrayDeque(2);
        TestingKvStateRegistryListener testingKvStateRegistryListener2 = new TestingKvStateRegistryListener(arrayDeque3, arrayDeque3);
        JobID jobID = new JobID();
        kvStateRegistry.registerListener(HighAvailabilityServices.DEFAULT_JOB_ID, testingKvStateRegistryListener);
        kvStateRegistry.registerListener(jobID, testingKvStateRegistryListener2);
        JobVertexID jobVertexID = new JobVertexID();
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
        KvStateID registerKvState = kvStateRegistry.registerKvState(jobID, jobVertexID, keyGroupRange, "registrationName", new DummyKvState(), getClass().getClassLoader());
        Assert.assertThat(arrayDeque.poll(), Matchers.equalTo(jobID));
        Assert.assertThat(Boolean.valueOf(arrayDeque3.isEmpty()), Matchers.is(true));
        kvStateRegistry.unregisterKvState(jobID, jobVertexID, keyGroupRange, "registrationName", registerKvState);
        Assert.assertThat(arrayDeque2.poll(), Matchers.equalTo(jobID));
        Assert.assertThat(Boolean.valueOf(arrayDeque3.isEmpty()), Matchers.is(true));
    }
}
