package org.apache.flink.queryablestate.network;

import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.messages.MessageType;
import org.apache.flink.queryablestate.network.stats.AtomicKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.queryablestate.server.KvStateServerImpl;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.core.CombinableMatcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/queryablestate/network/ClientTest.class */
public class ClientTest extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(ClientTest.class);
    private NioEventLoopGroup nioGroup;

    /* loaded from: input_file:org/apache/flink/queryablestate/network/ClientTest$ChannelDataCollectingHandler.class */
    private static class ChannelDataCollectingHandler extends ChannelInboundHandlerAdapter {
        private final AtomicReference<Channel> channel;
        private final LinkedBlockingQueue<ByteBuf> received;

        private ChannelDataCollectingHandler(AtomicReference<Channel> atomicReference, LinkedBlockingQueue<ByteBuf> linkedBlockingQueue) {
            this.channel = atomicReference;
            this.received = linkedBlockingQueue;
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) {
            this.channel.set(channelHandlerContext.channel());
        }

        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
            this.received.add((ByteBuf) obj);
        }
    }

    @ChannelHandler.Sharable
    /* loaded from: input_file:org/apache/flink/queryablestate/network/ClientTest$RespondingChannelHandler.class */
    private static final class RespondingChannelHandler extends ChannelInboundHandlerAdapter {
        private final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer;
        private final byte[] serializedResult;

        private RespondingChannelHandler(MessageSerializer<KvStateInternalRequest, KvStateResponse> messageSerializer, byte[] bArr) {
            this.serializer = messageSerializer;
            this.serializedResult = bArr;
        }

        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
            ByteBuf byteBuf = (ByteBuf) obj;
            Assert.assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(byteBuf));
            long requestId = MessageSerializer.getRequestId(byteBuf);
            this.serializer.deserializeRequest(byteBuf);
            byteBuf.release();
            channelHandlerContext.channel().writeAndFlush(MessageSerializer.serializeResponse(channelHandlerContext.alloc(), requestId, new KvStateResponse(this.serializedResult)));
        }
    }

    @Before
    public void setUp() throws Exception {
        this.nioGroup = new NioEventLoopGroup();
    }

    @After
    public void tearDown() throws Exception {
        if (this.nioGroup != null) {
            this.nioGroup.shutdownGracefully();
        }
    }

    @Test
    public void testSimpleRequests() throws Exception {
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        MessageSerializer messageSerializer = new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
        Client client = null;
        Channel channel = null;
        try {
            client = new Client("Test Client", 1, messageSerializer, atomicKvStateRequestStats);
            byte[] bArr = new byte[1024];
            ThreadLocalRandom.current().nextBytes(bArr);
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            AtomicReference atomicReference = new AtomicReference();
            channel = createServerChannel(new ChannelDataCollectingHandler(atomicReference, linkedBlockingQueue));
            InetSocketAddress kvStateServerAddress = getKvStateServerAddress(channel);
            ArrayList arrayList = new ArrayList();
            for (long j = 0; j < 1024; j++) {
                arrayList.add(client.sendRequest(kvStateServerAddress, new KvStateInternalRequest(new KvStateID(), new byte[0])));
            }
            RuntimeException runtimeException = new RuntimeException("Expected test Exception");
            for (long j2 = 0; j2 < 1024; j2++) {
                ByteBuf byteBuf = (ByteBuf) linkedBlockingQueue.take();
                Assert.assertNotNull("Receive timed out", byteBuf);
                Channel channel2 = (Channel) atomicReference.get();
                Assert.assertNotNull("Channel not active", channel2);
                Assert.assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(byteBuf));
                long requestId = MessageSerializer.getRequestId(byteBuf);
                messageSerializer.deserializeRequest(byteBuf);
                byteBuf.release();
                if (j2 % 2 == 0) {
                    channel2.writeAndFlush(MessageSerializer.serializeResponse(channel.alloc(), requestId, new KvStateResponse(bArr)));
                } else {
                    channel2.writeAndFlush(MessageSerializer.serializeRequestFailure(channel.alloc(), requestId, runtimeException));
                }
            }
            for (long j3 = 0; j3 < 1024; j3++) {
                if (j3 % 2 == 0) {
                    Assert.assertArrayEquals(bArr, ((KvStateResponse) ((CompletableFuture) arrayList.get((int) j3)).get()).getContent());
                } else {
                    try {
                        ((CompletableFuture) arrayList.get((int) j3)).get();
                        Assert.fail("Did not throw expected Exception");
                    } catch (ExecutionException e) {
                        if (!(e.getCause() instanceof RuntimeException)) {
                            Assert.fail("Did not throw expected Exception");
                        }
                    }
                }
            }
            Assert.assertEquals(1024L, atomicKvStateRequestStats.getNumRequests());
            long j4 = 1024 / 2;
            while (true) {
                if (atomicKvStateRequestStats.getNumSuccessful() == j4 && atomicKvStateRequestStats.getNumFailed() == j4) {
                    break;
                } else {
                    Thread.sleep(100L);
                }
            }
            Assert.assertEquals(j4, atomicKvStateRequestStats.getNumSuccessful());
            Assert.assertEquals(j4, atomicKvStateRequestStats.getNumFailed());
            if (client != null) {
                Exception exc = null;
                try {
                    client.shutdown().get();
                } catch (Exception e2) {
                    exc = e2;
                    LOG.error("An exception occurred while shutting down netty.", e2);
                }
                Assert.assertTrue(ExceptionUtils.stringifyException(exc), client.isEventGroupShutdown());
            }
            if (channel != null) {
                channel.close();
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
        } catch (Throwable th) {
            if (client != null) {
                Exception exc2 = null;
                try {
                    client.shutdown().get();
                } catch (Exception e3) {
                    exc2 = e3;
                    LOG.error("An exception occurred while shutting down netty.", e3);
                }
                Assert.assertTrue(ExceptionUtils.stringifyException(exc2), client.isEventGroupShutdown());
            }
            if (channel != null) {
                channel.close();
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
            throw th;
        }
    }

    @Test
    public void testRequestUnavailableHost() throws Exception {
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        Client client = null;
        try {
            client = new Client("Test Client", 1, new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()), atomicKvStateRequestStats);
            try {
                client.sendRequest(new InetSocketAddress(InetAddress.getLocalHost(), NetUtils.getAvailablePort()), new KvStateInternalRequest(new KvStateID(), new byte[0])).get();
                Assert.fail("Did not throw expected ConnectException");
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof ConnectException)) {
                    Assert.fail("Did not throw expected ConnectException");
                }
            }
            if (client != null) {
                try {
                    client.shutdown().get();
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
                Assert.assertTrue(client.isEventGroupShutdown());
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
        } catch (Throwable th) {
            if (client != null) {
                try {
                    client.shutdown().get();
                } catch (Exception e3) {
                    e3.printStackTrace();
                }
                Assert.assertTrue(client.isEventGroupShutdown());
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
            throw th;
        }
    }

    @Test
    public void testConcurrentQueries() throws Exception {
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        MessageSerializer messageSerializer = new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
        ExecutorService executorService = null;
        Client client = null;
        Channel channel = null;
        byte[] bArr = new byte[1024];
        ThreadLocalRandom.current().nextBytes(bArr);
        try {
            executorService = Executors.newFixedThreadPool(4);
            client = new Client("Test Client", 1, messageSerializer, atomicKvStateRequestStats);
            channel = createServerChannel(new RespondingChannelHandler(messageSerializer, bArr));
            InetSocketAddress kvStateServerAddress = getKvStateServerAddress(channel);
            Callable callable = () -> {
                ArrayList arrayList = new ArrayList(1024);
                for (int i = 0; i < 1024; i++) {
                    arrayList.add(client.sendRequest(kvStateServerAddress, new KvStateInternalRequest(new KvStateID(), new byte[0])));
                }
                return arrayList;
            };
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 4; i++) {
                arrayList.add(executorService.submit(callable));
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                Iterator it2 = ((List) ((Future) it.next()).get()).iterator();
                while (it2.hasNext()) {
                    Assert.assertArrayEquals(bArr, ((KvStateResponse) ((CompletableFuture) it2.next()).get()).getContent());
                }
            }
            int i2 = 4 * 1024;
            while (atomicKvStateRequestStats.getNumSuccessful() != i2) {
                Thread.sleep(100L);
            }
            Assert.assertEquals(i2, atomicKvStateRequestStats.getNumRequests());
            Assert.assertEquals(i2, atomicKvStateRequestStats.getNumSuccessful());
            if (executorService != null) {
                executorService.shutdown();
            }
            if (channel != null) {
                channel.close();
            }
            if (client != null) {
                try {
                    client.shutdown().get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                Assert.assertTrue(client.isEventGroupShutdown());
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
        } catch (Throwable th) {
            if (executorService != null) {
                executorService.shutdown();
            }
            if (channel != null) {
                channel.close();
            }
            if (client != null) {
                try {
                    client.shutdown().get();
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
                Assert.assertTrue(client.isEventGroupShutdown());
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
            throw th;
        }
    }

    @Test
    public void testFailureClosesChannel() throws Exception {
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        Client client = null;
        Channel channel = null;
        try {
            client = new Client("Test Client", 1, new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()), atomicKvStateRequestStats);
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            AtomicReference atomicReference = new AtomicReference();
            channel = createServerChannel(new ChannelDataCollectingHandler(atomicReference, linkedBlockingQueue));
            InetSocketAddress kvStateServerAddress = getKvStateServerAddress(channel);
            ArrayList arrayList = new ArrayList();
            KvStateInternalRequest kvStateInternalRequest = new KvStateInternalRequest(new KvStateID(), new byte[0]);
            arrayList.add(client.sendRequest(kvStateServerAddress, kvStateInternalRequest));
            arrayList.add(client.sendRequest(kvStateServerAddress, kvStateInternalRequest));
            ByteBuf byteBuf = (ByteBuf) linkedBlockingQueue.take();
            Assert.assertNotNull("Receive timed out", byteBuf);
            byteBuf.release();
            ByteBuf byteBuf2 = (ByteBuf) linkedBlockingQueue.take();
            Assert.assertNotNull("Receive timed out", byteBuf2);
            byteBuf2.release();
            Assert.assertEquals(1L, atomicKvStateRequestStats.getNumConnections());
            Channel channel2 = (Channel) atomicReference.get();
            Assert.assertNotNull("Channel not active", channel2);
            channel2.writeAndFlush(MessageSerializer.serializeServerFailure(channel.alloc(), new RuntimeException("Expected test server failure")));
            try {
                ((Future) arrayList.remove(0)).get();
                Assert.fail("Did not throw expected server failure");
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof RuntimeException)) {
                    Assert.fail("Did not throw expected Exception");
                }
            }
            try {
                ((Future) arrayList.remove(0)).get();
                Assert.fail("Did not throw expected server failure");
            } catch (ExecutionException e2) {
                if (!(e2.getCause() instanceof RuntimeException)) {
                    Assert.fail("Did not throw expected Exception");
                }
            }
            Assert.assertEquals(0L, atomicKvStateRequestStats.getNumConnections());
            while (true) {
                if (atomicKvStateRequestStats.getNumSuccessful() == 0 && atomicKvStateRequestStats.getNumFailed() == 2) {
                    break;
                } else {
                    Thread.sleep(100L);
                }
            }
            Assert.assertEquals(2L, atomicKvStateRequestStats.getNumRequests());
            Assert.assertEquals(0L, atomicKvStateRequestStats.getNumSuccessful());
            Assert.assertEquals(2L, atomicKvStateRequestStats.getNumFailed());
            if (client != null) {
                try {
                    client.shutdown().get();
                } catch (Exception e3) {
                    e3.printStackTrace();
                }
                Assert.assertTrue(client.isEventGroupShutdown());
            }
            if (channel != null) {
                channel.close();
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
        } catch (Throwable th) {
            if (client != null) {
                try {
                    client.shutdown().get();
                } catch (Exception e4) {
                    e4.printStackTrace();
                }
                Assert.assertTrue(client.isEventGroupShutdown());
            }
            if (channel != null) {
                channel.close();
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
            throw th;
        }
    }

    @Test
    public void testServerClosesChannel() throws Exception {
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        Client client = null;
        Channel channel = null;
        try {
            client = new Client("Test Client", 1, new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()), atomicKvStateRequestStats);
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            AtomicReference atomicReference = new AtomicReference();
            channel = createServerChannel(new ChannelDataCollectingHandler(atomicReference, linkedBlockingQueue));
            CompletableFuture sendRequest = client.sendRequest(getKvStateServerAddress(channel), new KvStateInternalRequest(new KvStateID(), new byte[0]));
            linkedBlockingQueue.take();
            Assert.assertEquals(1L, atomicKvStateRequestStats.getNumConnections());
            ((Channel) atomicReference.get()).close().await();
            try {
                sendRequest.get();
                Assert.fail("Did not throw expected server failure");
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof ClosedChannelException)) {
                    Assert.fail("Did not throw expected Exception");
                }
            }
            Assert.assertEquals(0L, atomicKvStateRequestStats.getNumConnections());
            while (true) {
                if (atomicKvStateRequestStats.getNumSuccessful() == 0 && atomicKvStateRequestStats.getNumFailed() == 1) {
                    break;
                } else {
                    Thread.sleep(100L);
                }
            }
            Assert.assertEquals(1L, atomicKvStateRequestStats.getNumRequests());
            Assert.assertEquals(0L, atomicKvStateRequestStats.getNumSuccessful());
            Assert.assertEquals(1L, atomicKvStateRequestStats.getNumFailed());
            if (client != null) {
                try {
                    client.shutdown().get();
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
                Assert.assertTrue(client.isEventGroupShutdown());
            }
            if (channel != null) {
                channel.close();
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
        } catch (Throwable th) {
            if (client != null) {
                try {
                    client.shutdown().get();
                } catch (Exception e3) {
                    e3.printStackTrace();
                }
                Assert.assertTrue(client.isEventGroupShutdown());
            }
            if (channel != null) {
                channel.close();
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
            throw th;
        }
    }

    @Test
    public void testClientServerIntegration() throws Throwable {
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend();
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test", 1, 0);
        dummyEnvironment.setKvStateRegistry(kvStateRegistry);
        AbstractKeyedStateBackend createKeyedStateBackend = memoryStateBackend.createKeyedStateBackend(dummyEnvironment, new JobID(), "test_op", IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), kvStateRegistry.createTaskRegistry(new JobID(), new JobVertexID()), TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), Collections.emptyList(), new CloseableRegistry());
        KeyGroupRange atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        Client client = null;
        ExecutorService executorService = null;
        KvStateServerImpl[] kvStateServerImplArr = new KvStateServerImpl[2];
        try {
            KeyGroupRange keyGroupRange = atomicKvStateRequestStats;
            client = new Client("Test Client", 4, new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()), keyGroupRange);
            executorService = Executors.newFixedThreadPool(8);
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("any", IntSerializer.INSTANCE);
            valueStateDescriptor.setQueryable("any");
            KvStateRegistry[] kvStateRegistryArr = new KvStateRegistry[2];
            KvStateRequestStats[] kvStateRequestStatsArr = new AtomicKvStateRequestStats[2];
            KvStateID[] kvStateIDArr = new KvStateID[2];
            for (int i = 0; i < 2; i++) {
                kvStateRegistryArr[i] = new KvStateRegistry();
                kvStateRequestStatsArr[i] = new AtomicKvStateRequestStats();
                kvStateServerImplArr[i] = new KvStateServerImpl(InetAddress.getLocalHost().getHostName(), Collections.singletonList(0).iterator(), 2, 2, kvStateRegistryArr[i], kvStateRequestStatsArr[i]);
                kvStateServerImplArr[i].start();
                createKeyedStateBackend.setCurrentKey(Integer.valueOf(1010 + i));
                InternalKvState internalKvState = (ValueState) createKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
                internalKvState.update(Integer.valueOf(201 + i));
                KvStateRegistry kvStateRegistry2 = kvStateRegistryArr[i];
                JobID jobID = new JobID();
                JobVertexID jobVertexID = new JobVertexID();
                keyGroupRange = new KeyGroupRange(0, 0);
                kvStateIDArr[i] = kvStateRegistry2.registerKvState(jobID, jobVertexID, keyGroupRange, "any", internalKvState, getClass().getClassLoader());
            }
            Callable callable = () -> {
                while (!Thread.interrupted()) {
                    ArrayList arrayList = new ArrayList();
                    for (int i2 = 0; i2 < 16; i2++) {
                        arrayList.add(Integer.valueOf(i2));
                    }
                    Collections.shuffle(arrayList);
                    ArrayList arrayList2 = new ArrayList(16);
                    for (int i3 = 0; i3 < 16; i3++) {
                        int intValue = ((Integer) arrayList.get(i3)).intValue() % 2;
                        arrayList2.add(client.sendRequest(kvStateServerImplArr[intValue].getServerAddress(), new KvStateInternalRequest(kvStateIDArr[intValue], KvStateSerializer.serializeKeyAndNamespace(Integer.valueOf(1010 + intValue), IntSerializer.INSTANCE, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE))));
                    }
                    for (int i4 = 0; i4 < 16; i4++) {
                        Assert.assertEquals(201 + (((Integer) arrayList.get(i4)).intValue() % 2), ((Integer) KvStateSerializer.deserializeValue(((KvStateResponse) ((Future) arrayList2.get(i4)).get()).getContent(), IntSerializer.INSTANCE)).intValue());
                    }
                }
                throw new InterruptedException();
            };
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < 8; i2++) {
                arrayList.add(executorService.submit(callable));
            }
            while (true) {
                long numRequests = atomicKvStateRequestStats.getNumRequests();
                if (numRequests < 100000) {
                    Thread.sleep(100L);
                    LOG.info("Number of requests {}/100_000", Long.valueOf(numRequests));
                } else {
                    try {
                        break;
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
            client.shutdown().get();
            Assert.assertTrue(client.isEventGroupShutdown());
            CombinableMatcher or = CoreMatchers.either(FlinkMatchers.containsCause(ClosedChannelException.class)).or(FlinkMatchers.containsCause(IllegalStateException.class));
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                try {
                    ((Future) it.next()).get();
                    Assert.fail("Did not throw expected Exception after shut down");
                } catch (ExecutionException e2) {
                    Assert.assertThat(e2, or);
                }
            }
            Assert.assertEquals("Connection leak (client)", 0L, atomicKvStateRequestStats.getNumConnections());
            for (int i3 = 0; i3 < 2; i3++) {
                boolean z = false;
                int i4 = 0;
                while (!z) {
                    try {
                        Assert.assertEquals("Connection leak (server)", 0L, kvStateRequestStatsArr[i3].getNumConnections());
                        z = true;
                    } catch (Throwable th) {
                        if (i4 >= 10) {
                            throw th;
                        }
                        LOG.info("Retrying connection leak check (server)");
                        Thread.sleep((i4 + 1) * 50);
                        i4++;
                    }
                }
            }
            if (client != null) {
                try {
                    client.shutdown().get();
                } catch (Exception e3) {
                    e3.printStackTrace();
                }
                Assert.assertTrue(client.isEventGroupShutdown());
            }
            for (int i5 = 0; i5 < 2; i5++) {
                if (kvStateServerImplArr[i5] != null) {
                    kvStateServerImplArr[i5].shutdown();
                }
            }
            if (executorService != null) {
                executorService.shutdown();
            }
        } catch (Throwable th2) {
            if (client != null) {
                try {
                    client.shutdown().get();
                } catch (Exception e4) {
                    e4.printStackTrace();
                }
                Assert.assertTrue(client.isEventGroupShutdown());
            }
            for (int i6 = 0; i6 < 2; i6++) {
                if (kvStateServerImplArr[i6] != null) {
                    kvStateServerImplArr[i6].shutdown();
                }
            }
            if (executorService != null) {
                executorService.shutdown();
            }
            throw th2;
        }
    }

    private Channel createServerChannel(final ChannelHandler... channelHandlerArr) throws UnknownHostException, InterruptedException {
        return new ServerBootstrap().localAddress(InetAddress.getLocalHost(), 0).group(this.nioGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.queryablestate.network.ClientTest.1
            /* JADX INFO: Access modifiers changed from: protected */
            public void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)}).addLast(channelHandlerArr);
            }
        }).bind().sync().channel();
    }

    private InetSocketAddress getKvStateServerAddress(Channel channel) {
        return (InetSocketAddress) channel.localAddress();
    }
}
