/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.queryablestate.network;

import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.Client;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.messages.MessageType;
import org.apache.flink.queryablestate.network.stats.AtomicKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.queryablestate.server.KvStateServerImpl;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

public class ClientTest {
    private static final Logger LOG = LoggerFactory.getLogger(ClientTest.class);
    private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(20L, TimeUnit.SECONDS);
    private NioEventLoopGroup nioGroup;

    @Before
    public void setUp() throws Exception {
        this.nioGroup = new NioEventLoopGroup();
    }

    @After
    public void tearDown() throws Exception {
        if (this.nioGroup != null) {
            this.nioGroup.shutdownGracefully();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSimpleRequests() throws Exception {
        Deadline deadline = TEST_TIMEOUT.fromNow();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        Client client = null;
        Channel serverChannel = null;
        try {
            long i;
            client = new Client("Test Client", 1, serializer, (KvStateRequestStats)stats);
            byte[] expected = new byte[1024];
            ThreadLocalRandom.current().nextBytes(expected);
            final LinkedBlockingQueue received = new LinkedBlockingQueue();
            final AtomicReference channel = new AtomicReference();
            serverChannel = this.createServerChannel(new ChannelHandler[]{new ChannelInboundHandlerAdapter(){

                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                    channel.set(ctx.channel());
                }

                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    received.add((ByteBuf)msg);
                }
            }});
            InetSocketAddress serverAddress = this.getKvStateServerAddress(serverChannel);
            long numQueries = 1024L;
            ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
            for (long i2 = 0L; i2 < numQueries; ++i2) {
                KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
                futures.add(client.sendRequest(serverAddress, (MessageBody)request));
            }
            RuntimeException testException = new RuntimeException("Expected test Exception");
            for (i = 0L; i < numQueries; ++i) {
                ByteBuf response;
                ByteBuf buf = (ByteBuf)received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                Assert.assertNotNull((String)"Receive timed out", (Object)buf);
                Channel ch = (Channel)channel.get();
                Assert.assertNotNull((String)"Channel not active", (Object)ch);
                Assert.assertEquals((Object)MessageType.REQUEST, (Object)MessageSerializer.deserializeHeader((ByteBuf)buf));
                long requestId = MessageSerializer.getRequestId((ByteBuf)buf);
                KvStateInternalRequest deserRequest = (KvStateInternalRequest)serializer.deserializeRequest(buf);
                buf.release();
                if (i % 2L == 0L) {
                    response = MessageSerializer.serializeResponse((ByteBufAllocator)serverChannel.alloc(), (long)requestId, (MessageBody)new KvStateResponse(expected));
                    ch.writeAndFlush((Object)response);
                    continue;
                }
                response = MessageSerializer.serializeRequestFailure((ByteBufAllocator)serverChannel.alloc(), (long)requestId, (Throwable)testException);
                ch.writeAndFlush((Object)response);
            }
            for (i = 0L; i < numQueries; ++i) {
                if (i % 2L == 0L) {
                    KvStateResponse serializedResult = (KvStateResponse)((CompletableFuture)futures.get((int)i)).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                    Assert.assertArrayEquals((byte[])expected, (byte[])serializedResult.getContent());
                    continue;
                }
                try {
                    ((CompletableFuture)futures.get((int)i)).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                    Assert.fail((String)"Did not throw expected Exception");
                    continue;
                }
                catch (ExecutionException e) {
                    if (e.getCause() instanceof RuntimeException) continue;
                    Assert.fail((String)"Did not throw expected Exception");
                }
            }
            Assert.assertEquals((long)numQueries, (long)stats.getNumRequests());
            long expectedRequests = numQueries / 2L;
            while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != expectedRequests || stats.getNumFailed() != expectedRequests)) {
                Thread.sleep(100L);
            }
            Assert.assertEquals((long)expectedRequests, (long)stats.getNumSuccessful());
            Assert.assertEquals((long)expectedRequests, (long)stats.getNumFailed());
        }
        finally {
            if (client != null) {
                Exception exc = null;
                try {
                    client.shutdown().get(10L, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    exc = e;
                    LOG.error("An exception occurred while shutting down netty.", (Throwable)e);
                }
                Assert.assertTrue((String)ExceptionUtils.stringifyException((Throwable)exc), (boolean)client.isEventGroupShutdown());
            }
            if (serverChannel != null) {
                serverChannel.close();
            }
            Assert.assertEquals((String)"Channel leak", (long)0L, (long)stats.getNumConnections());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestUnavailableHost() throws Exception {
        Deadline deadline = TEST_TIMEOUT.fromNow();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        Client client = null;
        try {
            client = new Client("Test Client", 1, serializer, (KvStateRequestStats)stats);
            int availablePort = NetUtils.getAvailablePort();
            InetSocketAddress serverAddress = new InetSocketAddress(InetAddress.getLocalHost(), availablePort);
            KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
            CompletableFuture future = client.sendRequest(serverAddress, (MessageBody)request);
            try {
                future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                Assert.fail((String)"Did not throw expected ConnectException");
            }
            catch (ExecutionException e) {
                if (!(e.getCause() instanceof ConnectException)) {
                    Assert.fail((String)"Did not throw expected ConnectException");
                }
            }
        }
        finally {
            if (client != null) {
                try {
                    client.shutdown().get(10L, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                Assert.assertTrue((boolean)client.isEventGroupShutdown());
            }
            Assert.assertEquals((String)"Channel leak", (long)0L, (long)stats.getNumConnections());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentQueries() throws Exception {
        Deadline deadline = TEST_TIMEOUT.fromNow();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        final MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        ExecutorService executor = null;
        Client client = null;
        Channel serverChannel = null;
        final byte[] serializedResult = new byte[1024];
        ThreadLocalRandom.current().nextBytes(serializedResult);
        try {
            int numQueryTasks = 4;
            int numQueriesPerTask = 1024;
            executor = Executors.newFixedThreadPool(numQueryTasks);
            client = new Client("Test Client", 1, serializer, (KvStateRequestStats)stats);
            serverChannel = this.createServerChannel(new ChannelHandler[]{new ChannelInboundHandlerAdapter(){

                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf)msg;
                    Assert.assertEquals((Object)MessageType.REQUEST, (Object)MessageSerializer.deserializeHeader((ByteBuf)buf));
                    long requestId = MessageSerializer.getRequestId((ByteBuf)buf);
                    KvStateInternalRequest request = (KvStateInternalRequest)serializer.deserializeRequest(buf);
                    buf.release();
                    KvStateResponse response = new KvStateResponse(serializedResult);
                    ByteBuf serResponse = MessageSerializer.serializeResponse((ByteBufAllocator)ctx.alloc(), (long)requestId, (MessageBody)response);
                    ctx.channel().writeAndFlush((Object)serResponse);
                }
            }});
            InetSocketAddress serverAddress = this.getKvStateServerAddress(serverChannel);
            Client finalClient = client;
            Callable<List> queryTask = () -> {
                ArrayList<CompletableFuture> results = new ArrayList<CompletableFuture>(1024);
                for (int i = 0; i < 1024; ++i) {
                    KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
                    results.add(finalClient.sendRequest(serverAddress, (MessageBody)request));
                }
                return results;
            };
            ArrayList<Future<List>> futures = new ArrayList<Future<List>>();
            for (int i = 0; i < numQueryTasks; ++i) {
                futures.add(executor.submit(queryTask));
            }
            for (Future future : futures) {
                List results = (List)future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                for (CompletableFuture result : results) {
                    KvStateResponse actual = (KvStateResponse)result.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                    Assert.assertArrayEquals((byte[])serializedResult, (byte[])actual.getContent());
                }
            }
            int totalQueries = numQueryTasks * 1024;
            while (deadline.hasTimeLeft() && stats.getNumSuccessful() != (long)totalQueries) {
                Thread.sleep(100L);
            }
            Assert.assertEquals((long)totalQueries, (long)stats.getNumRequests());
            Assert.assertEquals((long)totalQueries, (long)stats.getNumSuccessful());
        }
        finally {
            if (executor != null) {
                executor.shutdown();
            }
            if (serverChannel != null) {
                serverChannel.close();
            }
            if (client != null) {
                try {
                    client.shutdown().get(10L, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                Assert.assertTrue((boolean)client.isEventGroupShutdown());
            }
            Assert.assertEquals((String)"Channel leak", (long)0L, (long)stats.getNumConnections());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFailureClosesChannel() throws Exception {
        Deadline deadline = TEST_TIMEOUT.fromNow();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        Client client = null;
        Channel serverChannel = null;
        try {
            block15: {
                ArrayList<CompletableFuture> futures;
                block14: {
                    client = new Client("Test Client", 1, serializer, (KvStateRequestStats)stats);
                    final LinkedBlockingQueue received = new LinkedBlockingQueue();
                    final AtomicReference channel = new AtomicReference();
                    serverChannel = this.createServerChannel(new ChannelHandler[]{new ChannelInboundHandlerAdapter(){

                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            channel.set(ctx.channel());
                        }

                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            received.add((ByteBuf)msg);
                        }
                    }});
                    InetSocketAddress serverAddress = this.getKvStateServerAddress(serverChannel);
                    futures = new ArrayList<CompletableFuture>();
                    KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
                    futures.add(client.sendRequest(serverAddress, (MessageBody)request));
                    futures.add(client.sendRequest(serverAddress, (MessageBody)request));
                    ByteBuf buf = (ByteBuf)received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                    Assert.assertNotNull((String)"Receive timed out", (Object)buf);
                    buf.release();
                    buf = (ByteBuf)received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                    Assert.assertNotNull((String)"Receive timed out", (Object)buf);
                    buf.release();
                    Assert.assertEquals((long)1L, (long)stats.getNumConnections());
                    Channel ch = (Channel)channel.get();
                    Assert.assertNotNull((String)"Channel not active", (Object)ch);
                    ch.writeAndFlush((Object)MessageSerializer.serializeServerFailure((ByteBufAllocator)serverChannel.alloc(), (Throwable)new RuntimeException("Expected test server failure")));
                    try {
                        ((Future)futures.remove(0)).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                        Assert.fail((String)"Did not throw expected server failure");
                    }
                    catch (ExecutionException e) {
                        if (e.getCause() instanceof RuntimeException) break block14;
                        Assert.fail((String)"Did not throw expected Exception");
                    }
                }
                try {
                    ((Future)futures.remove(0)).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                    Assert.fail((String)"Did not throw expected server failure");
                }
                catch (ExecutionException e) {
                    if (e.getCause() instanceof RuntimeException) break block15;
                    Assert.fail((String)"Did not throw expected Exception");
                }
            }
            Assert.assertEquals((long)0L, (long)stats.getNumConnections());
            while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 2L)) {
                Thread.sleep(100L);
            }
            Assert.assertEquals((long)2L, (long)stats.getNumRequests());
            Assert.assertEquals((long)0L, (long)stats.getNumSuccessful());
            Assert.assertEquals((long)2L, (long)stats.getNumFailed());
        }
        finally {
            if (client != null) {
                try {
                    client.shutdown().get(10L, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                Assert.assertTrue((boolean)client.isEventGroupShutdown());
            }
            if (serverChannel != null) {
                serverChannel.close();
            }
            Assert.assertEquals((String)"Channel leak", (long)0L, (long)stats.getNumConnections());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testServerClosesChannel() throws Exception {
        Deadline deadline = TEST_TIMEOUT.fromNow();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        Client client = null;
        Channel serverChannel = null;
        try {
            block13: {
                client = new Client("Test Client", 1, serializer, (KvStateRequestStats)stats);
                final AtomicBoolean received = new AtomicBoolean();
                final AtomicReference channel = new AtomicReference();
                serverChannel = this.createServerChannel(new ChannelHandler[]{new ChannelInboundHandlerAdapter(){

                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                        channel.set(ctx.channel());
                    }

                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        received.set(true);
                    }
                }});
                InetSocketAddress serverAddress = this.getKvStateServerAddress(serverChannel);
                KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
                CompletableFuture future = client.sendRequest(serverAddress, (MessageBody)request);
                while (!received.get() && deadline.hasTimeLeft()) {
                    Thread.sleep(50L);
                }
                Assert.assertTrue((String)"Receive timed out", (boolean)received.get());
                Assert.assertEquals((long)1L, (long)stats.getNumConnections());
                ((Channel)channel.get()).close().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                try {
                    future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                    Assert.fail((String)"Did not throw expected server failure");
                }
                catch (ExecutionException e) {
                    if (e.getCause() instanceof ClosedChannelException) break block13;
                    Assert.fail((String)"Did not throw expected Exception");
                }
            }
            Assert.assertEquals((long)0L, (long)stats.getNumConnections());
            while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 1L)) {
                Thread.sleep(100L);
            }
            Assert.assertEquals((long)1L, (long)stats.getNumRequests());
            Assert.assertEquals((long)0L, (long)stats.getNumSuccessful());
            Assert.assertEquals((long)1L, (long)stats.getNumFailed());
        }
        finally {
            if (client != null) {
                try {
                    client.shutdown().get(10L, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                Assert.assertTrue((boolean)client.isEventGroupShutdown());
            }
            if (serverChannel != null) {
                serverChannel.close();
            }
            Assert.assertEquals((String)"Channel leak", (long)0L, (long)stats.getNumConnections());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testClientServerIntegration() throws Throwable {
        int numServers = 2;
        int numServerEventLoopThreads = 2;
        int numServerQueryThreads = 2;
        int numClientEventLoopThreads = 4;
        int numClientsTasks = 8;
        int batchSize = 16;
        boolean numKeyGroups = true;
        MemoryStateBackend abstractBackend = new MemoryStateBackend();
        KvStateRegistry dummyRegistry = new KvStateRegistry();
        DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
        dummyEnv.setKvStateRegistry(dummyRegistry);
        AbstractKeyedStateBackend backend = abstractBackend.createKeyedStateBackend((Environment)dummyEnv, new JobID(), "test_op", (TypeSerializer)IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID()));
        FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS);
        AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        Client client = null;
        ExecutorService clientTaskExecutor = null;
        KvStateServerImpl[] server = new KvStateServerImpl[2];
        try {
            long numRequests;
            client = new Client("Test Client", 4, serializer, (KvStateRequestStats)clientStats);
            clientTaskExecutor = Executors.newFixedThreadPool(8);
            ValueStateDescriptor desc = new ValueStateDescriptor("any", (TypeSerializer)IntSerializer.INSTANCE);
            desc.setQueryable("any");
            KvStateRegistry[] registry = new KvStateRegistry[2];
            AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[2];
            KvStateID[] ids = new KvStateID[2];
            for (int i = 0; i < 2; ++i) {
                registry[i] = new KvStateRegistry();
                serverStats[i] = new AtomicKvStateRequestStats();
                server[i] = new KvStateServerImpl(InetAddress.getLocalHost(), Collections.singletonList(0).iterator(), Integer.valueOf(2), Integer.valueOf(2), registry[i], (KvStateRequestStats)serverStats[i]);
                server[i].start();
                backend.setCurrentKey((Object)(1010 + i));
                ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
                state.update((Object)(201 + i));
                InternalKvState kvState = (InternalKvState)state;
                ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
            }
            Client finalClient = client;
            Callable<Void> queryTask = () -> {
                block0: while (true) {
                    int targetServer;
                    int j;
                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }
                    ArrayList<Integer> random = new ArrayList<Integer>();
                    for (int j2 = 0; j2 < 16; ++j2) {
                        random.add(j2);
                    }
                    Collections.shuffle(random);
                    ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>(16);
                    for (j = 0; j < 16; ++j) {
                        targetServer = (Integer)random.get(j) % 2;
                        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace((Object)(1010 + targetServer), (TypeSerializer)IntSerializer.INSTANCE, (Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE);
                        KvStateInternalRequest request = new KvStateInternalRequest(ids[targetServer], serializedKeyAndNamespace);
                        futures.add(finalClient.sendRequest(server[targetServer].getServerAddress(), (MessageBody)request));
                    }
                    j = 0;
                    while (true) {
                        if (j >= 16) continue block0;
                        targetServer = (Integer)random.get(j) % 2;
                        Future future = (Future)futures.get(j);
                        byte[] buf = ((KvStateResponse)future.get(timeout.toMillis(), TimeUnit.MILLISECONDS)).getContent();
                        int value = (Integer)KvStateSerializer.deserializeValue((byte[])buf, (TypeSerializer)IntSerializer.INSTANCE);
                        Assert.assertEquals((long)(201L + (long)targetServer), (long)value);
                        ++j;
                    }
                    break;
                }
            };
            ArrayList<Future<Void>> taskFutures = new ArrayList<Future<Void>>();
            for (int i = 0; i < 8; ++i) {
                taskFutures.add(clientTaskExecutor.submit(queryTask));
            }
            while ((numRequests = clientStats.getNumRequests()) < 100000L) {
                Thread.sleep(100L);
                LOG.info("Number of requests {}/100_000", (Object)numRequests);
            }
            try {
                client.shutdown().get(10L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            Assert.assertTrue((boolean)client.isEventGroupShutdown());
            for (Future future : taskFutures) {
                try {
                    future.get();
                    Assert.fail((String)"Did not throw expected Exception after shut down");
                }
                catch (ExecutionException t) {
                    if (t.getCause().getCause() instanceof ClosedChannelException || t.getCause().getCause() instanceof IllegalStateException) continue;
                    t.printStackTrace();
                    Assert.fail((String)("Failed with unexpected Exception type: " + t.getClass().getName()));
                }
            }
            Assert.assertEquals((String)"Connection leak (client)", (long)0L, (long)clientStats.getNumConnections());
            for (int i = 0; i < 2; ++i) {
                boolean bl = false;
                int numRetries = 0;
                while (!bl) {
                    try {
                        Assert.assertEquals((String)"Connection leak (server)", (long)0L, (long)serverStats[i].getNumConnections());
                        bl = true;
                    }
                    catch (Throwable t) {
                        if (numRetries < 10) {
                            LOG.info("Retrying connection leak check (server)");
                            Thread.sleep((long)(numRetries + 1) * 50L);
                            ++numRetries;
                            continue;
                        }
                        throw t;
                    }
                }
            }
        }
        finally {
            if (client != null) {
                try {
                    client.shutdown().get(10L, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                Assert.assertTrue((boolean)client.isEventGroupShutdown());
            }
            for (int i = 0; i < 2; ++i) {
                if (server[i] == null) continue;
                server[i].shutdown();
            }
            if (clientTaskExecutor != null) {
                clientTaskExecutor.shutdown();
            }
        }
    }

    private Channel createServerChannel(final ChannelHandler ... handlers) throws UnknownHostException, InterruptedException {
        ServerBootstrap bootstrap = ((ServerBootstrap)((ServerBootstrap)new ServerBootstrap().localAddress(InetAddress.getLocalHost(), 0)).group((EventLoopGroup)this.nioGroup).channel(NioServerSocketChannel.class)).childHandler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)}).addLast(handlers);
            }
        });
        return bootstrap.bind().sync().channel();
    }

    private InetSocketAddress getKvStateServerAddress(Channel serverChannel) {
        return (InetSocketAddress)serverChannel.localAddress();
    }
}

