package org.apache.flink.queryablestate.network;

import java.io.IOException;
import java.net.InetAddress;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.messages.MessageType;
import org.apache.flink.queryablestate.network.messages.RequestFailure;
import org.apache.flink.queryablestate.network.stats.AtomicKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.queryablestate.server.KvStateServerHandler;
import org.apache.flink.queryablestate.server.KvStateServerImpl;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/queryablestate/network/KvStateServerHandlerTest.class */
public class KvStateServerHandlerTest extends TestLogger {
    private static KvStateServerImpl testServer;
    private static final long READ_TIMEOUT_MILLIS = 10000;

    /* loaded from: input_file:org/apache/flink/queryablestate/network/KvStateServerHandlerTest$TestRegistryListener.class */
    static class TestRegistryListener implements KvStateRegistryListener {
        volatile JobVertexID jobVertexID;
        volatile KeyGroupRange keyGroupIndex;
        volatile String registrationName;
        volatile KvStateID kvStateId;

        public void notifyKvStateRegistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str, KvStateID kvStateID) {
            this.jobVertexID = jobVertexID;
            this.keyGroupIndex = keyGroupRange;
            this.registrationName = str;
            this.kvStateId = kvStateID;
        }

        public void notifyKvStateUnregistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str) {
        }
    }

    @BeforeClass
    public static void setup() {
        try {
            testServer = new KvStateServerImpl(InetAddress.getLocalHost().getHostName(), Collections.singletonList(0).iterator(), 1, 1, new KvStateRegistry(), new DisabledKvStateRequestStats());
            testServer.start();
        } catch (Throwable th) {
            th.printStackTrace();
        }
    }

    @AfterClass
    public static void tearDown() throws Exception {
        testServer.shutdown();
    }

    @Test
    public void testSimpleQuery() throws Exception {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        MessageSerializer messageSerializer = new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(testServer, kvStateRegistry, messageSerializer, atomicKvStateRequestStats)});
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("any", IntSerializer.INSTANCE);
        valueStateDescriptor.setQueryable("vanilla");
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test", 1, 0);
        dummyEnvironment.setKvStateRegistry(kvStateRegistry);
        AbstractKeyedStateBackend<Integer> createKeyedStateBackend = createKeyedStateBackend(kvStateRegistry, 1, memoryStateBackend, dummyEnvironment);
        TestRegistryListener testRegistryListener = new TestRegistryListener();
        kvStateRegistry.registerListener(dummyEnvironment.getJobID(), testRegistryListener);
        createKeyedStateBackend.setCurrentKey(99812822);
        createKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor).update(712828289);
        byte[] serializeKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(99812822, IntSerializer.INSTANCE, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
        Assert.assertTrue(testRegistryListener.registrationName.equals("vanilla"));
        embeddedChannel.writeInbound(new Object[]{MessageSerializer.serializeRequest(embeddedChannel.alloc(), 2147666475L, new KvStateInternalRequest(testRegistryListener.kvStateId, serializeKeyAndNamespace))});
        ByteBuf byteBuf = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf.skipBytes(4);
        Assert.assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(byteBuf));
        long requestId = MessageSerializer.getRequestId(byteBuf);
        KvStateResponse deserializeResponse = messageSerializer.deserializeResponse(byteBuf);
        byteBuf.release();
        Assert.assertEquals(2147666475L, requestId);
        Assert.assertEquals(712828289, ((Integer) KvStateSerializer.deserializeValue(deserializeResponse.getContent(), IntSerializer.INSTANCE)).intValue());
        Assert.assertEquals(atomicKvStateRequestStats.toString(), 1L, atomicKvStateRequestStats.getNumRequests());
        long nanoTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30L, TimeUnit.SECONDS);
        while (atomicKvStateRequestStats.getNumSuccessful() != 1 && System.nanoTime() <= nanoTime) {
            Thread.sleep(10L);
        }
        Assert.assertEquals(atomicKvStateRequestStats.toString(), 1L, atomicKvStateRequestStats.getNumSuccessful());
    }

    @Test
    public void testQueryUnknownKvStateID() throws Exception {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(testServer, kvStateRegistry, new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()), atomicKvStateRequestStats)});
        embeddedChannel.writeInbound(new Object[]{MessageSerializer.serializeRequest(embeddedChannel.alloc(), 2147666475L, new KvStateInternalRequest(new KvStateID(), new byte[0]))});
        ByteBuf byteBuf = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf.skipBytes(4);
        Assert.assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(byteBuf));
        RequestFailure deserializeRequestFailure = MessageSerializer.deserializeRequestFailure(byteBuf);
        byteBuf.release();
        Assert.assertEquals(2147666475L, deserializeRequestFailure.getRequestId());
        Assert.assertTrue("Did not respond with expected failure cause", deserializeRequestFailure.getCause() instanceof UnknownKvStateIdException);
        Assert.assertEquals(1L, atomicKvStateRequestStats.getNumRequests());
        Assert.assertEquals(1L, atomicKvStateRequestStats.getNumFailed());
    }

    @Test
    public void testQueryUnknownKey() throws Exception {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(testServer, kvStateRegistry, new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()), atomicKvStateRequestStats)});
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test", 1, 0);
        dummyEnvironment.setKvStateRegistry(kvStateRegistry);
        AbstractKeyedStateBackend<Integer> createKeyedStateBackend = createKeyedStateBackend(kvStateRegistry, 1, memoryStateBackend, dummyEnvironment);
        TestRegistryListener testRegistryListener = new TestRegistryListener();
        kvStateRegistry.registerListener(dummyEnvironment.getJobID(), testRegistryListener);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("any", IntSerializer.INSTANCE);
        valueStateDescriptor.setQueryable("vanilla");
        createKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        byte[] serializeKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(1238283, IntSerializer.INSTANCE, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
        Assert.assertTrue(testRegistryListener.registrationName.equals("vanilla"));
        embeddedChannel.writeInbound(new Object[]{MessageSerializer.serializeRequest(embeddedChannel.alloc(), 2147506629L, new KvStateInternalRequest(testRegistryListener.kvStateId, serializeKeyAndNamespace))});
        ByteBuf byteBuf = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf.skipBytes(4);
        Assert.assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(byteBuf));
        RequestFailure deserializeRequestFailure = MessageSerializer.deserializeRequestFailure(byteBuf);
        byteBuf.release();
        Assert.assertEquals(2147506629L, deserializeRequestFailure.getRequestId());
        Assert.assertTrue("Did not respond with expected failure cause", deserializeRequestFailure.getCause() instanceof UnknownKeyOrNamespaceException);
        Assert.assertEquals(1L, atomicKvStateRequestStats.getNumRequests());
        Assert.assertEquals(1L, atomicKvStateRequestStats.getNumFailed());
    }

    @Test
    public void testFailureOnGetSerializedValue() throws Exception {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(testServer, kvStateRegistry, new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()), atomicKvStateRequestStats)});
        embeddedChannel.writeInbound(new Object[]{MessageSerializer.serializeRequest(embeddedChannel.alloc(), 282872L, new KvStateInternalRequest(kvStateRegistry.registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "vanilla", new InternalKvState<Integer, VoidNamespace, Long>() { // from class: org.apache.flink.queryablestate.network.KvStateServerHandlerTest.1
            public TypeSerializer<Integer> getKeySerializer() {
                return IntSerializer.INSTANCE;
            }

            public TypeSerializer<VoidNamespace> getNamespaceSerializer() {
                return VoidNamespaceSerializer.INSTANCE;
            }

            public TypeSerializer<Long> getValueSerializer() {
                return LongSerializer.INSTANCE;
            }

            public void setCurrentNamespace(VoidNamespace voidNamespace) {
            }

            public byte[] getSerializedValue(byte[] bArr, TypeSerializer<Integer> typeSerializer, TypeSerializer<VoidNamespace> typeSerializer2, TypeSerializer<Long> typeSerializer3) throws Exception {
                throw new RuntimeException("Expected test Exception");
            }

            public InternalKvState.StateIncrementalVisitor<Integer, VoidNamespace, Long> getStateIncrementalVisitor(int i) {
                throw new UnsupportedOperationException();
            }

            public void clear() {
            }
        }), new byte[0]))});
        ByteBuf byteBuf = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf.skipBytes(4);
        Assert.assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(byteBuf));
        RequestFailure deserializeRequestFailure = MessageSerializer.deserializeRequestFailure(byteBuf);
        byteBuf.release();
        Assert.assertTrue(deserializeRequestFailure.getCause().getMessage().contains("Expected test Exception"));
        Assert.assertEquals(1L, atomicKvStateRequestStats.getNumRequests());
        Assert.assertEquals(1L, atomicKvStateRequestStats.getNumFailed());
    }

    @Test
    public void testCloseChannelOnExceptionCaught() throws Exception {
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{new KvStateServerHandler(testServer, new KvStateRegistry(), new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()), new AtomicKvStateRequestStats())});
        embeddedChannel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception"));
        ByteBuf byteBuf = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf.skipBytes(4);
        Assert.assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(byteBuf));
        Throwable deserializeServerFailure = MessageSerializer.deserializeServerFailure(byteBuf);
        byteBuf.release();
        Assert.assertTrue(deserializeServerFailure.getMessage().contains("Expected test Exception"));
        embeddedChannel.closeFuture().await(READ_TIMEOUT_MILLIS);
        Assert.assertFalse(embeddedChannel.isActive());
    }

    @Test
    public void testQueryExecutorShutDown() throws Throwable {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        KvStateServerImpl kvStateServerImpl = new KvStateServerImpl(InetAddress.getLocalHost().getHostName(), Collections.singletonList(0).iterator(), 1, 1, new KvStateRegistry(), new DisabledKvStateRequestStats());
        kvStateServerImpl.start();
        kvStateServerImpl.shutdown();
        Assert.assertTrue(kvStateServerImpl.getQueryExecutor().isTerminated());
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(kvStateServerImpl, kvStateRegistry, new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()), atomicKvStateRequestStats)});
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test", 1, 0);
        dummyEnvironment.setKvStateRegistry(kvStateRegistry);
        AbstractKeyedStateBackend<Integer> createKeyedStateBackend = createKeyedStateBackend(kvStateRegistry, 1, memoryStateBackend, dummyEnvironment);
        TestRegistryListener testRegistryListener = new TestRegistryListener();
        kvStateRegistry.registerListener(dummyEnvironment.getJobID(), testRegistryListener);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("any", IntSerializer.INSTANCE);
        valueStateDescriptor.setQueryable("vanilla");
        createKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        Assert.assertTrue(testRegistryListener.registrationName.equals("vanilla"));
        embeddedChannel.writeInbound(new Object[]{MessageSerializer.serializeRequest(embeddedChannel.alloc(), 282872L, new KvStateInternalRequest(testRegistryListener.kvStateId, new byte[0]))});
        ByteBuf byteBuf = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf.skipBytes(4);
        Assert.assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(byteBuf));
        RequestFailure deserializeRequestFailure = MessageSerializer.deserializeRequestFailure(byteBuf);
        byteBuf.release();
        Assert.assertTrue(deserializeRequestFailure.getCause().getMessage().contains("RejectedExecutionException"));
        Assert.assertEquals(1L, atomicKvStateRequestStats.getNumRequests());
        Assert.assertEquals(1L, atomicKvStateRequestStats.getNumFailed());
        kvStateServerImpl.shutdown();
    }

    @Test
    public void testUnexpectedMessage() throws Exception {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(testServer, kvStateRegistry, new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()), atomicKvStateRequestStats)});
        ByteBuf buffer = Unpooled.buffer(8);
        buffer.writeInt(4);
        buffer.writeInt(123238213);
        embeddedChannel.writeInbound(new Object[]{buffer});
        ByteBuf byteBuf = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf.skipBytes(4);
        Assert.assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(byteBuf));
        MessageSerializer.deserializeServerFailure(byteBuf);
        byteBuf.release();
        Assert.assertEquals(0L, atomicKvStateRequestStats.getNumRequests());
        Assert.assertEquals(0L, atomicKvStateRequestStats.getNumFailed());
        embeddedChannel.writeInbound(new Object[]{MessageSerializer.serializeResponse(embeddedChannel.alloc(), 192L, new KvStateResponse(new byte[0]))});
        ByteBuf byteBuf2 = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf2.skipBytes(4);
        Assert.assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(byteBuf2));
        Throwable deserializeServerFailure = MessageSerializer.deserializeServerFailure(byteBuf2);
        byteBuf2.release();
        Assert.assertTrue("Unexpected failure cause " + deserializeServerFailure.getClass().getName(), deserializeServerFailure instanceof IllegalArgumentException);
        Assert.assertEquals(0L, atomicKvStateRequestStats.getNumRequests());
        Assert.assertEquals(0L, atomicKvStateRequestStats.getNumFailed());
    }

    @Test
    public void testIncomingBufferIsRecycled() throws Exception {
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(testServer, new KvStateRegistry(), new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()), new AtomicKvStateRequestStats())});
        ByteBuf serializeRequest = MessageSerializer.serializeRequest(embeddedChannel.alloc(), 282872L, new KvStateInternalRequest(new KvStateID(), new byte[0]));
        Assert.assertEquals(1L, serializeRequest.refCnt());
        embeddedChannel.writeInbound(new Object[]{serializeRequest});
        Assert.assertEquals("Buffer not recycled", 0L, serializeRequest.refCnt());
        ByteBuf buffer = embeddedChannel.alloc().buffer(8);
        buffer.writeInt(4);
        buffer.writeInt(4);
        Assert.assertEquals(1L, buffer.refCnt());
        embeddedChannel.writeInbound(new Object[]{buffer});
        Assert.assertEquals("Buffer not recycled", 0L, buffer.refCnt());
        embeddedChannel.finishAndReleaseAll();
    }

    @Test
    public void testSerializerMismatch() throws Exception {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(testServer, kvStateRegistry, new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()), atomicKvStateRequestStats)});
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test", 1, 0);
        dummyEnvironment.setKvStateRegistry(kvStateRegistry);
        AbstractKeyedStateBackend<Integer> createKeyedStateBackend = createKeyedStateBackend(kvStateRegistry, 1, memoryStateBackend, dummyEnvironment);
        TestRegistryListener testRegistryListener = new TestRegistryListener();
        kvStateRegistry.registerListener(dummyEnvironment.getJobID(), testRegistryListener);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("any", IntSerializer.INSTANCE);
        valueStateDescriptor.setQueryable("vanilla");
        ValueState partitionedState = createKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        createKeyedStateBackend.setCurrentKey(99812822);
        partitionedState.update(712828289);
        byte[] serializeKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace("wrong-key-type", StringSerializer.INSTANCE, "wrong-namespace-type", StringSerializer.INSTANCE);
        byte[] serializeKeyAndNamespace2 = KvStateSerializer.serializeKeyAndNamespace(99812822, IntSerializer.INSTANCE, "wrong-namespace-type", StringSerializer.INSTANCE);
        Assert.assertTrue(testRegistryListener.registrationName.equals("vanilla"));
        embeddedChannel.writeInbound(new Object[]{MessageSerializer.serializeRequest(embeddedChannel.alloc(), 182828L, new KvStateInternalRequest(testRegistryListener.kvStateId, serializeKeyAndNamespace))});
        ByteBuf byteBuf = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf.skipBytes(4);
        Assert.assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(byteBuf));
        RequestFailure deserializeRequestFailure = MessageSerializer.deserializeRequestFailure(byteBuf);
        byteBuf.release();
        Assert.assertEquals(182828L, deserializeRequestFailure.getRequestId());
        Assert.assertTrue(deserializeRequestFailure.getCause().getMessage().contains("IOException"));
        embeddedChannel.writeInbound(new Object[]{MessageSerializer.serializeRequest(embeddedChannel.alloc(), 182829L, new KvStateInternalRequest(testRegistryListener.kvStateId, serializeKeyAndNamespace2))});
        ByteBuf byteBuf2 = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf2.skipBytes(4);
        Assert.assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(byteBuf2));
        RequestFailure deserializeRequestFailure2 = MessageSerializer.deserializeRequestFailure(byteBuf2);
        byteBuf2.release();
        Assert.assertEquals(182829L, deserializeRequestFailure2.getRequestId());
        Assert.assertTrue(deserializeRequestFailure2.getCause().getMessage().contains("IOException"));
        Assert.assertEquals(2L, atomicKvStateRequestStats.getNumRequests());
        Assert.assertEquals(2L, atomicKvStateRequestStats.getNumFailed());
    }

    @Test
    public void testChunkedResponse() throws Exception {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(testServer, kvStateRegistry, new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()), new AtomicKvStateRequestStats())});
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test", 1, 0);
        dummyEnvironment.setKvStateRegistry(kvStateRegistry);
        AbstractKeyedStateBackend<Integer> createKeyedStateBackend = createKeyedStateBackend(kvStateRegistry, 1, memoryStateBackend, dummyEnvironment);
        TestRegistryListener testRegistryListener = new TestRegistryListener();
        kvStateRegistry.registerListener(dummyEnvironment.getJobID(), testRegistryListener);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("any", BytePrimitiveArraySerializer.INSTANCE);
        valueStateDescriptor.setQueryable("vanilla");
        ValueState partitionedState = createKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        byte[] bArr = new byte[2 * embeddedChannel.config().getWriteBufferHighWaterMark()];
        byte b = 0;
        for (int i = 0; i < bArr.length; i++) {
            byte b2 = b;
            b = (byte) (b + 1);
            bArr[i] = b2;
        }
        createKeyedStateBackend.setCurrentKey(99812822);
        partitionedState.update(bArr);
        byte[] serializeKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(99812822, IntSerializer.INSTANCE, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
        Assert.assertTrue(testRegistryListener.registrationName.equals("vanilla"));
        embeddedChannel.writeInbound(new Object[]{MessageSerializer.serializeRequest(embeddedChannel.alloc(), 2147666475L, new KvStateInternalRequest(testRegistryListener.kvStateId, serializeKeyAndNamespace))});
        Object readInboundBlocking = readInboundBlocking(embeddedChannel);
        Assert.assertTrue("Not ChunkedByteBuf", readInboundBlocking instanceof ChunkedByteBuf);
        ((ChunkedByteBuf) readInboundBlocking).close();
    }

    private Object readInboundBlocking(EmbeddedChannel embeddedChannel) throws InterruptedException, TimeoutException {
        Object obj = null;
        for (long j = 0; j < READ_TIMEOUT_MILLIS; j += 50) {
            Object readOutbound = embeddedChannel.readOutbound();
            obj = readOutbound;
            if (readOutbound != null) {
                break;
            }
            Thread.sleep(50L);
        }
        if (obj == null) {
            throw new TimeoutException();
        }
        return obj;
    }

    private ChannelHandler getFrameDecoder() {
        return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4);
    }

    private AbstractKeyedStateBackend<Integer> createKeyedStateBackend(KvStateRegistry kvStateRegistry, int i, AbstractStateBackend abstractStateBackend, DummyEnvironment dummyEnvironment) throws IOException {
        return abstractStateBackend.createKeyedStateBackend(dummyEnvironment, dummyEnvironment.getJobID(), "test_op", IntSerializer.INSTANCE, i, new KeyGroupRange(0, 0), kvStateRegistry.createTaskRegistry(dummyEnvironment.getJobID(), dummyEnvironment.getJobVertexId()), TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), Collections.emptyList(), new CloseableRegistry());
    }
}
