package org.apache.flink.queryablestate.network;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.network.KvStateServerHandlerTest;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.messages.MessageType;
import org.apache.flink.queryablestate.network.stats.AtomicKvStateRequestStats;
import org.apache.flink.queryablestate.server.KvStateServerImpl;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/queryablestate/network/KvStateServerTest.class */
public class KvStateServerTest {
    private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
    private static final int TIMEOUT_MILLIS = 10000;

    @AfterClass
    public static void tearDown() throws Exception {
        if (NIO_GROUP != null) {
            NIO_GROUP.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testSimpleRequest() throws Throwable {
        EventLoopGroup group;
        EventLoopGroup group2;
        KvStateServerImpl kvStateServerImpl = null;
        Bootstrap bootstrap = null;
        try {
            KvStateRegistry kvStateRegistry = new KvStateRegistry();
            kvStateServerImpl = new KvStateServerImpl(InetAddress.getLocalHost(), Collections.singletonList(0).iterator(), 1, 1, kvStateRegistry, new AtomicKvStateRequestStats());
            kvStateServerImpl.start();
            InetSocketAddress serverAddress = kvStateServerImpl.getServerAddress();
            MemoryStateBackend memoryStateBackend = new MemoryStateBackend();
            DummyEnvironment dummyEnvironment = new DummyEnvironment("test", 1, 0);
            dummyEnvironment.setKvStateRegistry(kvStateRegistry);
            JobID jobID = new JobID();
            AbstractKeyedStateBackend createKeyedStateBackend = memoryStateBackend.createKeyedStateBackend(dummyEnvironment, jobID, "test_op", IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), kvStateRegistry.createTaskRegistry(jobID, new JobVertexID()));
            KvStateServerHandlerTest.TestRegistryListener testRegistryListener = new KvStateServerHandlerTest.TestRegistryListener();
            kvStateRegistry.registerListener(jobID, testRegistryListener);
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("any", IntSerializer.INSTANCE);
            valueStateDescriptor.setQueryable("vanilla");
            ValueState partitionedState = createKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
            createKeyedStateBackend.setCurrentKey(99812822);
            partitionedState.update(712828289);
            byte[] serializeKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(99812822, IntSerializer.INSTANCE, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
            final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            bootstrap = createBootstrap(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4), new ChannelInboundHandlerAdapter() { // from class: org.apache.flink.queryablestate.network.KvStateServerTest.1
                public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                    linkedBlockingQueue.add((ByteBuf) obj);
                }
            });
            Channel channel = bootstrap.connect(serverAddress.getAddress(), serverAddress.getPort()).sync().channel();
            Assert.assertTrue(testRegistryListener.registrationName.equals("vanilla"));
            channel.writeAndFlush(MessageSerializer.serializeRequest(channel.alloc(), 2147666475L, new KvStateInternalRequest(testRegistryListener.kvStateId, serializeKeyAndNamespace)));
            ByteBuf byteBuf = (ByteBuf) linkedBlockingQueue.poll(10000L, TimeUnit.MILLISECONDS);
            Assert.assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(byteBuf));
            Assert.assertEquals(2147666475L, MessageSerializer.getRequestId(byteBuf));
            Assert.assertEquals(712828289, ((Integer) KvStateSerializer.deserializeValue(kvStateServerImpl.getSerializer().deserializeResponse(byteBuf).getContent(), IntSerializer.INSTANCE)).intValue());
            if (kvStateServerImpl != null) {
                kvStateServerImpl.shutdown();
            }
            if (bootstrap == null || (group2 = bootstrap.group()) == null) {
                return;
            }
            group2.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
        } catch (Throwable th) {
            if (kvStateServerImpl != null) {
                kvStateServerImpl.shutdown();
            }
            if (bootstrap != null && (group = bootstrap.group()) != null) {
                group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
            }
            throw th;
        }
    }

    private Bootstrap createBootstrap(final ChannelHandler... channelHandlerArr) {
        return new Bootstrap().group(NIO_GROUP).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.queryablestate.network.KvStateServerTest.2
            /* JADX INFO: Access modifiers changed from: protected */
            public void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(channelHandlerArr);
            }
        });
    }
}
