package org.apache.flink.runtime.query.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.netty.message.KvStateRequest;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.NetUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/query/netty/KvStateClientTest.class */
public class KvStateClientTest {
    private static final Logger LOG = LoggerFactory.getLogger(KvStateClientTest.class);
    private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
    private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);

    @AfterClass
    public static void tearDown() throws Exception {
        if (NIO_GROUP != null) {
            NIO_GROUP.shutdownGracefully();
        }
    }

    @Test
    public void testSimpleRequests() throws Exception {
        Deadline fromNow = TEST_TIMEOUT.fromNow();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        KvStateClient kvStateClient = null;
        Channel channel = null;
        try {
            kvStateClient = new KvStateClient(1, atomicKvStateRequestStats);
            byte[] bArr = new byte[1024];
            ThreadLocalRandom.current().nextBytes(bArr);
            final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            final AtomicReference atomicReference = new AtomicReference();
            channel = createServerChannel(new ChannelInboundHandlerAdapter() { // from class: org.apache.flink.runtime.query.netty.KvStateClientTest.1
                public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
                    atomicReference.set(channelHandlerContext.channel());
                }

                public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                    linkedBlockingQueue.add((ByteBuf) obj);
                }
            });
            KvStateServerAddress kvStateServerAddress = getKvStateServerAddress(channel);
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 1024; i++) {
                arrayList.add(kvStateClient.getKvState(kvStateServerAddress, new KvStateID(), new byte[0]));
            }
            RuntimeException runtimeException = new RuntimeException("Expected test Exception");
            for (int i2 = 0; i2 < 1024; i2++) {
                ByteBuf byteBuf = (ByteBuf) linkedBlockingQueue.poll(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                Assert.assertNotNull("Receive timed out", byteBuf);
                Channel channel2 = (Channel) atomicReference.get();
                Assert.assertNotNull("Channel not active", channel2);
                Assert.assertEquals(KvStateRequestType.REQUEST, KvStateRequestSerializer.deserializeHeader(byteBuf));
                KvStateRequest deserializeKvStateRequest = KvStateRequestSerializer.deserializeKvStateRequest(byteBuf);
                byteBuf.release();
                if (i2 % 2 == 0) {
                    channel2.writeAndFlush(KvStateRequestSerializer.serializeKvStateRequestResult(channel.alloc(), deserializeKvStateRequest.getRequestId(), bArr));
                } else {
                    channel2.writeAndFlush(KvStateRequestSerializer.serializeKvStateRequestFailure(channel.alloc(), deserializeKvStateRequest.getRequestId(), runtimeException));
                }
            }
            for (int i3 = 0; i3 < 1024; i3++) {
                if (i3 % 2 == 0) {
                    Assert.assertArrayEquals(bArr, (byte[]) Await.result((Awaitable) arrayList.get(i3), fromNow.timeLeft()));
                } else {
                    try {
                        Await.result((Awaitable) arrayList.get(i3), fromNow.timeLeft());
                        Assert.fail("Did not throw expected Exception");
                    } catch (RuntimeException e) {
                    }
                }
            }
            Assert.assertEquals(1024, atomicKvStateRequestStats.getNumRequests());
            int i4 = 1024 / 2;
            while (fromNow.hasTimeLeft() && (atomicKvStateRequestStats.getNumSuccessful() != i4 || atomicKvStateRequestStats.getNumFailed() != i4)) {
                Thread.sleep(100L);
            }
            Assert.assertEquals(i4, atomicKvStateRequestStats.getNumSuccessful());
            Assert.assertEquals(i4, atomicKvStateRequestStats.getNumFailed());
            if (kvStateClient != null) {
                kvStateClient.shutDown();
            }
            if (channel != null) {
                channel.close();
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
        } catch (Throwable th) {
            if (kvStateClient != null) {
                kvStateClient.shutDown();
            }
            if (channel != null) {
                channel.close();
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
            throw th;
        }
    }

    @Test
    public void testRequestUnavailableHost() throws Exception {
        Deadline fromNow = TEST_TIMEOUT.fromNow();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        KvStateClient kvStateClient = null;
        try {
            kvStateClient = new KvStateClient(1, atomicKvStateRequestStats);
            try {
                Await.result(kvStateClient.getKvState(new KvStateServerAddress(InetAddress.getLocalHost(), NetUtils.getAvailablePort()), new KvStateID(), new byte[0]), fromNow.timeLeft());
                Assert.fail("Did not throw expected ConnectException");
            } catch (ConnectException e) {
            }
            if (kvStateClient != null) {
                kvStateClient.shutDown();
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
        } catch (Throwable th) {
            if (kvStateClient != null) {
                kvStateClient.shutDown();
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
            throw th;
        }
    }

    @Test
    public void testConcurrentQueries() throws Exception {
        Deadline fromNow = TEST_TIMEOUT.fromNow();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        ExecutorService executorService = null;
        final KvStateClient kvStateClient = null;
        Channel channel = null;
        final byte[] bArr = new byte[1024];
        ThreadLocalRandom.current().nextBytes(bArr);
        try {
            executorService = Executors.newFixedThreadPool(4);
            kvStateClient = new KvStateClient(1, atomicKvStateRequestStats);
            channel = createServerChannel(new ChannelInboundHandlerAdapter() { // from class: org.apache.flink.runtime.query.netty.KvStateClientTest.2
                public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                    ByteBuf byteBuf = (ByteBuf) obj;
                    Assert.assertEquals(KvStateRequestType.REQUEST, KvStateRequestSerializer.deserializeHeader(byteBuf));
                    KvStateRequest deserializeKvStateRequest = KvStateRequestSerializer.deserializeKvStateRequest(byteBuf);
                    byteBuf.release();
                    channelHandlerContext.channel().writeAndFlush(KvStateRequestSerializer.serializeKvStateRequestResult(channelHandlerContext.alloc(), deserializeKvStateRequest.getRequestId(), bArr));
                }
            });
            final KvStateServerAddress kvStateServerAddress = getKvStateServerAddress(channel);
            Callable<List<Future<byte[]>>> callable = new Callable<List<Future<byte[]>>>() { // from class: org.apache.flink.runtime.query.netty.KvStateClientTest.3
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public List<Future<byte[]>> call() throws Exception {
                    ArrayList arrayList = new ArrayList(1024);
                    for (int i = 0; i < 1024; i++) {
                        arrayList.add(kvStateClient.getKvState(kvStateServerAddress, new KvStateID(), new byte[0]));
                    }
                    return arrayList;
                }
            };
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 4; i++) {
                arrayList.add(executorService.submit(callable));
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                Iterator it2 = ((List) ((java.util.concurrent.Future) it.next()).get(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).iterator();
                while (it2.hasNext()) {
                    Assert.assertArrayEquals(bArr, (byte[]) Await.result((Future) it2.next(), fromNow.timeLeft()));
                }
            }
            int i2 = 4 * 1024;
            while (fromNow.hasTimeLeft() && atomicKvStateRequestStats.getNumSuccessful() != i2) {
                Thread.sleep(100L);
            }
            Assert.assertEquals(i2, atomicKvStateRequestStats.getNumRequests());
            Assert.assertEquals(i2, atomicKvStateRequestStats.getNumSuccessful());
            if (executorService != null) {
                executorService.shutdown();
            }
            if (channel != null) {
                channel.close();
            }
            if (kvStateClient != null) {
                kvStateClient.shutDown();
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
        } catch (Throwable th) {
            if (executorService != null) {
                executorService.shutdown();
            }
            if (channel != null) {
                channel.close();
            }
            if (kvStateClient != null) {
                kvStateClient.shutDown();
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
            throw th;
        }
    }

    @Test
    public void testFailureClosesChannel() throws Exception {
        Deadline fromNow = TEST_TIMEOUT.fromNow();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        KvStateClient kvStateClient = null;
        Channel channel = null;
        try {
            kvStateClient = new KvStateClient(1, atomicKvStateRequestStats);
            final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            final AtomicReference atomicReference = new AtomicReference();
            channel = createServerChannel(new ChannelInboundHandlerAdapter() { // from class: org.apache.flink.runtime.query.netty.KvStateClientTest.4
                public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
                    atomicReference.set(channelHandlerContext.channel());
                }

                public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                    linkedBlockingQueue.add((ByteBuf) obj);
                }
            });
            KvStateServerAddress kvStateServerAddress = getKvStateServerAddress(channel);
            ArrayList arrayList = new ArrayList();
            arrayList.add(kvStateClient.getKvState(kvStateServerAddress, new KvStateID(), new byte[0]));
            arrayList.add(kvStateClient.getKvState(kvStateServerAddress, new KvStateID(), new byte[0]));
            ByteBuf byteBuf = (ByteBuf) linkedBlockingQueue.poll(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            Assert.assertNotNull("Receive timed out", byteBuf);
            byteBuf.release();
            ByteBuf byteBuf2 = (ByteBuf) linkedBlockingQueue.poll(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            Assert.assertNotNull("Receive timed out", byteBuf2);
            byteBuf2.release();
            Assert.assertEquals(1L, atomicKvStateRequestStats.getNumConnections());
            Channel channel2 = (Channel) atomicReference.get();
            Assert.assertNotNull("Channel not active", channel2);
            channel2.writeAndFlush(KvStateRequestSerializer.serializeServerFailure(channel.alloc(), new RuntimeException("Expected test server failure")));
            try {
                Await.result((Awaitable) arrayList.remove(0), fromNow.timeLeft());
                Assert.fail("Did not throw expected server failure");
            } catch (RuntimeException e) {
            }
            try {
                Await.result((Awaitable) arrayList.remove(0), fromNow.timeLeft());
                Assert.fail("Did not throw expected server failure");
            } catch (RuntimeException e2) {
            }
            Assert.assertEquals(0L, atomicKvStateRequestStats.getNumConnections());
            while (fromNow.hasTimeLeft() && (atomicKvStateRequestStats.getNumSuccessful() != 0 || atomicKvStateRequestStats.getNumFailed() != 2)) {
                Thread.sleep(100L);
            }
            Assert.assertEquals(2L, atomicKvStateRequestStats.getNumRequests());
            Assert.assertEquals(0L, atomicKvStateRequestStats.getNumSuccessful());
            Assert.assertEquals(2L, atomicKvStateRequestStats.getNumFailed());
            if (kvStateClient != null) {
                kvStateClient.shutDown();
            }
            if (channel != null) {
                channel.close();
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
        } catch (Throwable th) {
            if (kvStateClient != null) {
                kvStateClient.shutDown();
            }
            if (channel != null) {
                channel.close();
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
            throw th;
        }
    }

    @Test
    public void testServerClosesChannel() throws Exception {
        Deadline fromNow = TEST_TIMEOUT.fromNow();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        KvStateClient kvStateClient = null;
        Channel channel = null;
        try {
            kvStateClient = new KvStateClient(1, atomicKvStateRequestStats);
            final AtomicBoolean atomicBoolean = new AtomicBoolean();
            final AtomicReference atomicReference = new AtomicReference();
            channel = createServerChannel(new ChannelInboundHandlerAdapter() { // from class: org.apache.flink.runtime.query.netty.KvStateClientTest.5
                public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
                    atomicReference.set(channelHandlerContext.channel());
                }

                public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                    atomicBoolean.set(true);
                }
            });
            Future kvState = kvStateClient.getKvState(getKvStateServerAddress(channel), new KvStateID(), new byte[0]);
            while (!atomicBoolean.get() && fromNow.hasTimeLeft()) {
                Thread.sleep(50L);
            }
            Assert.assertTrue("Receive timed out", atomicBoolean.get());
            Assert.assertEquals(1L, atomicKvStateRequestStats.getNumConnections());
            ((Channel) atomicReference.get()).close().await(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            try {
                Await.result(kvState, fromNow.timeLeft());
                Assert.fail("Did not throw expected server failure");
            } catch (ClosedChannelException e) {
            }
            Assert.assertEquals(0L, atomicKvStateRequestStats.getNumConnections());
            while (fromNow.hasTimeLeft() && (atomicKvStateRequestStats.getNumSuccessful() != 0 || atomicKvStateRequestStats.getNumFailed() != 1)) {
                Thread.sleep(100L);
            }
            Assert.assertEquals(1L, atomicKvStateRequestStats.getNumRequests());
            Assert.assertEquals(0L, atomicKvStateRequestStats.getNumSuccessful());
            Assert.assertEquals(1L, atomicKvStateRequestStats.getNumFailed());
            if (kvStateClient != null) {
                kvStateClient.shutDown();
            }
            if (channel != null) {
                channel.close();
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
        } catch (Throwable th) {
            if (kvStateClient != null) {
                kvStateClient.shutDown();
            }
            if (channel != null) {
                channel.close();
            }
            Assert.assertEquals("Channel leak", 0L, atomicKvStateRequestStats.getNumConnections());
            throw th;
        }
    }

    @Test
    public void testClientServerIntegration() throws Exception {
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend();
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test", 1, 0);
        dummyEnvironment.setKvStateRegistry(kvStateRegistry);
        AbstractKeyedStateBackend createKeyedStateBackend = memoryStateBackend.createKeyedStateBackend(dummyEnvironment, new JobID(), "test_op", IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), kvStateRegistry.createTaskRegistry(new JobID(), new JobVertexID()));
        final FiniteDuration finiteDuration = new FiniteDuration(10L, TimeUnit.SECONDS);
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        final KvStateClient kvStateClient = null;
        ExecutorService executorService = null;
        final KvStateServer[] kvStateServerArr = new KvStateServer[2];
        try {
            kvStateClient = new KvStateClient(4, atomicKvStateRequestStats);
            executorService = Executors.newFixedThreadPool(8);
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("any", IntSerializer.INSTANCE);
            valueStateDescriptor.setQueryable("any");
            KvStateRegistry[] kvStateRegistryArr = new KvStateRegistry[2];
            KvStateRequestStats[] kvStateRequestStatsArr = new AtomicKvStateRequestStats[2];
            final KvStateID[] kvStateIDArr = new KvStateID[2];
            for (int i = 0; i < 2; i++) {
                kvStateRegistryArr[i] = new KvStateRegistry();
                kvStateRequestStatsArr[i] = new AtomicKvStateRequestStats();
                kvStateServerArr[i] = new KvStateServer(InetAddress.getLocalHost(), 0, 2, 2, kvStateRegistryArr[i], kvStateRequestStatsArr[i]);
                kvStateServerArr[i].start();
                createKeyedStateBackend.setCurrentKey(Integer.valueOf(1010 + i));
                InternalKvState internalKvState = (ValueState) createKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
                internalKvState.update(Integer.valueOf(201 + i));
                kvStateIDArr[i] = kvStateRegistryArr[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", internalKvState);
            }
            Callable<Void> callable = new Callable<Void>() { // from class: org.apache.flink.runtime.query.netty.KvStateClientTest.6
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    while (!Thread.interrupted()) {
                        ArrayList arrayList = new ArrayList();
                        for (int i2 = 0; i2 < 16; i2++) {
                            arrayList.add(Integer.valueOf(i2));
                        }
                        Collections.shuffle(arrayList);
                        ArrayList arrayList2 = new ArrayList(16);
                        for (int i3 = 0; i3 < 16; i3++) {
                            int intValue = ((Integer) arrayList.get(i3)).intValue() % 2;
                            arrayList2.add(kvStateClient.getKvState(kvStateServerArr[intValue].getAddress(), kvStateIDArr[intValue], KvStateRequestSerializer.serializeKeyAndNamespace(Integer.valueOf(1010 + intValue), IntSerializer.INSTANCE, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE)));
                        }
                        for (int i4 = 0; i4 < 16; i4++) {
                            Assert.assertEquals(201 + (((Integer) arrayList.get(i4)).intValue() % 2), ((Integer) KvStateRequestSerializer.deserializeValue((byte[]) Await.result((Future) arrayList2.get(i4), finiteDuration), IntSerializer.INSTANCE)).intValue());
                        }
                    }
                    throw new InterruptedException();
                }
            };
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < 8; i2++) {
                arrayList.add(executorService.submit(callable));
            }
            while (true) {
                long numRequests = atomicKvStateRequestStats.getNumRequests();
                if (numRequests >= 100000) {
                    break;
                }
                Thread.sleep(100L);
                LOG.info("Number of requests {}/100_000", Long.valueOf(numRequests));
            }
            kvStateClient.shutDown();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                try {
                    ((java.util.concurrent.Future) it.next()).get();
                    Assert.fail("Did not throw expected Exception after shut down");
                } catch (ExecutionException e) {
                    if (!(e.getCause() instanceof ClosedChannelException) && !(e.getCause() instanceof IllegalStateException)) {
                        e.printStackTrace();
                        Assert.fail("Failed with unexpected Exception type: " + e.getClass().getName());
                    }
                }
            }
            Assert.assertEquals("Connection leak (client)", 0L, atomicKvStateRequestStats.getNumConnections());
            for (int i3 = 0; i3 < 2; i3++) {
                boolean z = false;
                int i4 = 0;
                while (!z) {
                    try {
                        Assert.assertEquals("Connection leak (server)", 0L, kvStateRequestStatsArr[i3].getNumConnections());
                        z = true;
                    } catch (Throwable th) {
                        if (i4 >= 10) {
                            throw th;
                        }
                        LOG.info("Retrying connection leak check (server)");
                        Thread.sleep((i4 + 1) * 50);
                        i4++;
                    }
                }
            }
            if (kvStateClient != null) {
                kvStateClient.shutDown();
            }
            for (int i5 = 0; i5 < 2; i5++) {
                if (kvStateServerArr[i5] != null) {
                    kvStateServerArr[i5].shutDown();
                }
            }
            if (executorService != null) {
                executorService.shutdown();
            }
        } catch (Throwable th2) {
            if (kvStateClient != null) {
                kvStateClient.shutDown();
            }
            for (int i6 = 0; i6 < 2; i6++) {
                if (kvStateServerArr[i6] != null) {
                    kvStateServerArr[i6].shutDown();
                }
            }
            if (executorService != null) {
                executorService.shutdown();
            }
            throw th2;
        }
    }

    private Channel createServerChannel(final ChannelHandler... channelHandlerArr) throws UnknownHostException, InterruptedException {
        return new ServerBootstrap().localAddress(InetAddress.getLocalHost(), 0).group(NIO_GROUP).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.runtime.query.netty.KvStateClientTest.7
            /* JADX INFO: Access modifiers changed from: protected */
            public void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)}).addLast(channelHandlerArr);
            }
        }).bind().sync().channel();
    }

    private KvStateServerAddress getKvStateServerAddress(Channel channel) {
        InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.localAddress();
        return new KvStateServerAddress(inetSocketAddress.getAddress(), inetSocketAddress.getPort());
    }
}
