/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.queryablestate.network;

import java.io.IOException;
import java.net.InetAddress;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.ChunkedByteBuf;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.messages.MessageType;
import org.apache.flink.queryablestate.network.messages.RequestFailure;
import org.apache.flink.queryablestate.network.stats.AtomicKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.queryablestate.server.KvStateServerHandler;
import org.apache.flink.queryablestate.server.KvStateServerImpl;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

@Ignore(value="KvStateServerHandlerTest is unstable. See FLINK-13553 for more information. Since the community does not have time to work on QS, we decided to temporarily ignore this test case in orderto maintain build stability.")
public class KvStateServerHandlerTest
extends TestLogger {
    private static KvStateServerImpl testServer;
    private static final long READ_TIMEOUT_MILLIS = 10000L;

    @BeforeClass
    public static void setup() {
        try {
            testServer = new KvStateServerImpl(InetAddress.getLocalHost().getHostName(), Collections.singletonList(0).iterator(), Integer.valueOf(1), Integer.valueOf(1), new KvStateRegistry(), (KvStateRequestStats)new DisabledKvStateRequestStats());
            testServer.start();
        }
        catch (Throwable e) {
            e.printStackTrace();
        }
    }

    @AfterClass
    public static void tearDown() throws Exception {
        testServer.shutdown();
    }

    @Test
    public void testSimpleQuery() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        ValueStateDescriptor desc = new ValueStateDescriptor("any", (TypeSerializer)IntSerializer.INSTANCE);
        desc.setQueryable("vanilla");
        int numKeyGroups = 1;
        MemoryStateBackend abstractBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
        dummyEnv.setKvStateRegistry(registry);
        AbstractKeyedStateBackend<Integer> backend = this.createKeyedStateBackend(registry, numKeyGroups, (AbstractStateBackend)abstractBackend, dummyEnv);
        TestRegistryListener registryListener = new TestRegistryListener();
        registry.registerListener(dummyEnv.getJobID(), (KvStateRegistryListener)registryListener);
        int expectedValue = 712828289;
        int key = 99812822;
        backend.setCurrentKey((Object)key);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        state.update((Object)expectedValue);
        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace((Object)key, (TypeSerializer)IntSerializer.INSTANCE, (Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE);
        long requestId = 2147666475L;
        Assert.assertTrue((boolean)registryListener.registrationName.equals("vanilla"));
        KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
        ByteBuf serRequest = MessageSerializer.serializeRequest((ByteBufAllocator)channel.alloc(), (long)requestId, (MessageBody)request);
        channel.writeInbound(new Object[]{serRequest});
        ByteBuf buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)MessageType.REQUEST_RESULT, (Object)MessageSerializer.deserializeHeader((ByteBuf)buf));
        long deserRequestId = MessageSerializer.getRequestId((ByteBuf)buf);
        KvStateResponse response = (KvStateResponse)serializer.deserializeResponse(buf);
        buf.release();
        Assert.assertEquals((long)requestId, (long)deserRequestId);
        int actualValue = (Integer)KvStateSerializer.deserializeValue((byte[])response.getContent(), (TypeSerializer)IntSerializer.INSTANCE);
        Assert.assertEquals((long)expectedValue, (long)actualValue);
        Assert.assertEquals((String)stats.toString(), (long)1L, (long)stats.getNumRequests());
        long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30L, TimeUnit.SECONDS);
        while (stats.getNumSuccessful() != 1L && System.nanoTime() <= deadline) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((String)stats.toString(), (long)1L, (long)stats.getNumSuccessful());
    }

    @Test
    public void testQueryUnknownKvStateID() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        long requestId = 2147666475L;
        KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
        ByteBuf serRequest = MessageSerializer.serializeRequest((ByteBufAllocator)channel.alloc(), (long)requestId, (MessageBody)request);
        channel.writeInbound(new Object[]{serRequest});
        ByteBuf buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)MessageType.REQUEST_FAILURE, (Object)MessageSerializer.deserializeHeader((ByteBuf)buf));
        RequestFailure response = MessageSerializer.deserializeRequestFailure((ByteBuf)buf);
        buf.release();
        Assert.assertEquals((long)requestId, (long)response.getRequestId());
        Assert.assertTrue((String)"Did not respond with expected failure cause", (boolean)(response.getCause() instanceof UnknownKvStateIdException));
        Assert.assertEquals((long)1L, (long)stats.getNumRequests());
        Assert.assertEquals((long)1L, (long)stats.getNumFailed());
    }

    @Test
    public void testQueryUnknownKey() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        int numKeyGroups = 1;
        MemoryStateBackend abstractBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
        dummyEnv.setKvStateRegistry(registry);
        AbstractKeyedStateBackend<Integer> backend = this.createKeyedStateBackend(registry, numKeyGroups, (AbstractStateBackend)abstractBackend, dummyEnv);
        TestRegistryListener registryListener = new TestRegistryListener();
        registry.registerListener(dummyEnv.getJobID(), (KvStateRegistryListener)registryListener);
        ValueStateDescriptor desc = new ValueStateDescriptor("any", (TypeSerializer)IntSerializer.INSTANCE);
        desc.setQueryable("vanilla");
        backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace((Object)1238283, (TypeSerializer)IntSerializer.INSTANCE, (Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE);
        long requestId = 2147506629L;
        Assert.assertTrue((boolean)registryListener.registrationName.equals("vanilla"));
        KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
        ByteBuf serRequest = MessageSerializer.serializeRequest((ByteBufAllocator)channel.alloc(), (long)requestId, (MessageBody)request);
        channel.writeInbound(new Object[]{serRequest});
        ByteBuf buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)MessageType.REQUEST_FAILURE, (Object)MessageSerializer.deserializeHeader((ByteBuf)buf));
        RequestFailure response = MessageSerializer.deserializeRequestFailure((ByteBuf)buf);
        buf.release();
        Assert.assertEquals((long)requestId, (long)response.getRequestId());
        Assert.assertTrue((String)"Did not respond with expected failure cause", (boolean)(response.getCause() instanceof UnknownKeyOrNamespaceException));
        Assert.assertEquals((long)1L, (long)stats.getNumRequests());
        Assert.assertEquals((long)1L, (long)stats.getNumFailed());
    }

    @Test
    public void testFailureOnGetSerializedValue() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        InternalKvState<Integer, VoidNamespace, Long> kvState = new InternalKvState<Integer, VoidNamespace, Long>(){

            public TypeSerializer<Integer> getKeySerializer() {
                return IntSerializer.INSTANCE;
            }

            public TypeSerializer<VoidNamespace> getNamespaceSerializer() {
                return VoidNamespaceSerializer.INSTANCE;
            }

            public TypeSerializer<Long> getValueSerializer() {
                return LongSerializer.INSTANCE;
            }

            public void setCurrentNamespace(VoidNamespace namespace) {
            }

            public byte[] getSerializedValue(byte[] serializedKeyAndNamespace, TypeSerializer<Integer> safeKeySerializer, TypeSerializer<VoidNamespace> safeNamespaceSerializer, TypeSerializer<Long> safeValueSerializer) throws Exception {
                throw new RuntimeException("Expected test Exception");
            }

            public InternalKvState.StateIncrementalVisitor<Integer, VoidNamespace, Long> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
                throw new UnsupportedOperationException();
            }

            public void clear() {
            }
        };
        KvStateID kvStateId = registry.registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "vanilla", (InternalKvState)kvState);
        KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, new byte[0]);
        ByteBuf serRequest = MessageSerializer.serializeRequest((ByteBufAllocator)channel.alloc(), (long)282872L, (MessageBody)request);
        channel.writeInbound(new Object[]{serRequest});
        ByteBuf buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)MessageType.REQUEST_FAILURE, (Object)MessageSerializer.deserializeHeader((ByteBuf)buf));
        RequestFailure response = MessageSerializer.deserializeRequestFailure((ByteBuf)buf);
        buf.release();
        Assert.assertTrue((boolean)response.getCause().getMessage().contains("Expected test Exception"));
        Assert.assertEquals((long)1L, (long)stats.getNumRequests());
        Assert.assertEquals((long)1L, (long)stats.getNumFailed());
    }

    @Test
    public void testCloseChannelOnExceptionCaught() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        channel.pipeline().fireExceptionCaught((Throwable)new RuntimeException("Expected test Exception"));
        ByteBuf buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)MessageType.SERVER_FAILURE, (Object)MessageSerializer.deserializeHeader((ByteBuf)buf));
        Throwable response = MessageSerializer.deserializeServerFailure((ByteBuf)buf);
        buf.release();
        Assert.assertTrue((boolean)response.getMessage().contains("Expected test Exception"));
        channel.closeFuture().await(10000L);
        Assert.assertFalse((boolean)channel.isActive());
    }

    @Test
    public void testQueryExecutorShutDown() throws Throwable {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        KvStateServerImpl localTestServer = new KvStateServerImpl(InetAddress.getLocalHost().getHostName(), Collections.singletonList(0).iterator(), Integer.valueOf(1), Integer.valueOf(1), new KvStateRegistry(), (KvStateRequestStats)new DisabledKvStateRequestStats());
        localTestServer.start();
        localTestServer.shutdown();
        Assert.assertTrue((boolean)localTestServer.getQueryExecutor().isTerminated());
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        KvStateServerHandler handler = new KvStateServerHandler(localTestServer, registry, serializer, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        int numKeyGroups = 1;
        MemoryStateBackend abstractBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
        dummyEnv.setKvStateRegistry(registry);
        AbstractKeyedStateBackend<Integer> backend = this.createKeyedStateBackend(registry, numKeyGroups, (AbstractStateBackend)abstractBackend, dummyEnv);
        TestRegistryListener registryListener = new TestRegistryListener();
        registry.registerListener(dummyEnv.getJobID(), (KvStateRegistryListener)registryListener);
        ValueStateDescriptor desc = new ValueStateDescriptor("any", (TypeSerializer)IntSerializer.INSTANCE);
        desc.setQueryable("vanilla");
        backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        Assert.assertTrue((boolean)registryListener.registrationName.equals("vanilla"));
        KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, new byte[0]);
        ByteBuf serRequest = MessageSerializer.serializeRequest((ByteBufAllocator)channel.alloc(), (long)282872L, (MessageBody)request);
        channel.writeInbound(new Object[]{serRequest});
        ByteBuf buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)MessageType.REQUEST_FAILURE, (Object)MessageSerializer.deserializeHeader((ByteBuf)buf));
        RequestFailure response = MessageSerializer.deserializeRequestFailure((ByteBuf)buf);
        buf.release();
        Assert.assertTrue((boolean)response.getCause().getMessage().contains("RejectedExecutionException"));
        Assert.assertEquals((long)1L, (long)stats.getNumRequests());
        Assert.assertEquals((long)1L, (long)stats.getNumFailed());
        localTestServer.shutdown();
    }

    @Test
    public void testUnexpectedMessage() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        ByteBuf unexpectedMessage = Unpooled.buffer((int)8);
        unexpectedMessage.writeInt(4);
        unexpectedMessage.writeInt(123238213);
        channel.writeInbound(new Object[]{unexpectedMessage});
        ByteBuf buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)MessageType.SERVER_FAILURE, (Object)MessageSerializer.deserializeHeader((ByteBuf)buf));
        Throwable response = MessageSerializer.deserializeServerFailure((ByteBuf)buf);
        buf.release();
        Assert.assertEquals((long)0L, (long)stats.getNumRequests());
        Assert.assertEquals((long)0L, (long)stats.getNumFailed());
        KvStateResponse stateResponse = new KvStateResponse(new byte[0]);
        unexpectedMessage = MessageSerializer.serializeResponse((ByteBufAllocator)channel.alloc(), (long)192L, (MessageBody)stateResponse);
        channel.writeInbound(new Object[]{unexpectedMessage});
        buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)MessageType.SERVER_FAILURE, (Object)MessageSerializer.deserializeHeader((ByteBuf)buf));
        response = MessageSerializer.deserializeServerFailure((ByteBuf)buf);
        buf.release();
        Assert.assertTrue((String)("Unexpected failure cause " + response.getClass().getName()), (boolean)(response instanceof IllegalArgumentException));
        Assert.assertEquals((long)0L, (long)stats.getNumRequests());
        Assert.assertEquals((long)0L, (long)stats.getNumFailed());
    }

    @Test
    public void testIncomingBufferIsRecycled() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
        ByteBuf serRequest = MessageSerializer.serializeRequest((ByteBufAllocator)channel.alloc(), (long)282872L, (MessageBody)request);
        Assert.assertEquals((long)1L, (long)serRequest.refCnt());
        channel.writeInbound(new Object[]{serRequest});
        Assert.assertEquals((String)"Buffer not recycled", (long)0L, (long)serRequest.refCnt());
        ByteBuf unexpected = channel.alloc().buffer(8);
        unexpected.writeInt(4);
        unexpected.writeInt(4);
        Assert.assertEquals((long)1L, (long)unexpected.refCnt());
        channel.writeInbound(new Object[]{unexpected});
        Assert.assertEquals((String)"Buffer not recycled", (long)0L, (long)unexpected.refCnt());
        channel.finishAndReleaseAll();
    }

    @Test
    public void testSerializerMismatch() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        int numKeyGroups = 1;
        MemoryStateBackend abstractBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
        dummyEnv.setKvStateRegistry(registry);
        AbstractKeyedStateBackend<Integer> backend = this.createKeyedStateBackend(registry, numKeyGroups, (AbstractStateBackend)abstractBackend, dummyEnv);
        TestRegistryListener registryListener = new TestRegistryListener();
        registry.registerListener(dummyEnv.getJobID(), (KvStateRegistryListener)registryListener);
        ValueStateDescriptor desc = new ValueStateDescriptor("any", (TypeSerializer)IntSerializer.INSTANCE);
        desc.setQueryable("vanilla");
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        int key = 99812822;
        backend.setCurrentKey((Object)key);
        state.update((Object)712828289);
        byte[] wrongKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace((Object)"wrong-key-type", (TypeSerializer)StringSerializer.INSTANCE, (Object)"wrong-namespace-type", (TypeSerializer)StringSerializer.INSTANCE);
        byte[] wrongNamespace = KvStateSerializer.serializeKeyAndNamespace((Object)key, (TypeSerializer)IntSerializer.INSTANCE, (Object)"wrong-namespace-type", (TypeSerializer)StringSerializer.INSTANCE);
        Assert.assertTrue((boolean)registryListener.registrationName.equals("vanilla"));
        KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, wrongKeyAndNamespace);
        ByteBuf serRequest = MessageSerializer.serializeRequest((ByteBufAllocator)channel.alloc(), (long)182828L, (MessageBody)request);
        channel.writeInbound(new Object[]{serRequest});
        ByteBuf buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)MessageType.REQUEST_FAILURE, (Object)MessageSerializer.deserializeHeader((ByteBuf)buf));
        RequestFailure response = MessageSerializer.deserializeRequestFailure((ByteBuf)buf);
        buf.release();
        Assert.assertEquals((long)182828L, (long)response.getRequestId());
        Assert.assertTrue((boolean)response.getCause().getMessage().contains("IOException"));
        request = new KvStateInternalRequest(registryListener.kvStateId, wrongNamespace);
        serRequest = MessageSerializer.serializeRequest((ByteBufAllocator)channel.alloc(), (long)182829L, (MessageBody)request);
        channel.writeInbound(new Object[]{serRequest});
        buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)MessageType.REQUEST_FAILURE, (Object)MessageSerializer.deserializeHeader((ByteBuf)buf));
        response = MessageSerializer.deserializeRequestFailure((ByteBuf)buf);
        buf.release();
        Assert.assertEquals((long)182829L, (long)response.getRequestId());
        Assert.assertTrue((boolean)response.getCause().getMessage().contains("IOException"));
        Assert.assertEquals((long)2L, (long)stats.getNumRequests());
        Assert.assertEquals((long)2L, (long)stats.getNumFailed());
    }

    @Test
    public void testChunkedResponse() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        int numKeyGroups = 1;
        MemoryStateBackend abstractBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
        dummyEnv.setKvStateRegistry(registry);
        AbstractKeyedStateBackend<Integer> backend = this.createKeyedStateBackend(registry, numKeyGroups, (AbstractStateBackend)abstractBackend, dummyEnv);
        TestRegistryListener registryListener = new TestRegistryListener();
        registry.registerListener(dummyEnv.getJobID(), (KvStateRegistryListener)registryListener);
        ValueStateDescriptor desc = new ValueStateDescriptor("any", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
        desc.setQueryable("vanilla");
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        byte[] bytes = new byte[2 * channel.config().getWriteBufferHighWaterMark()];
        int current = 0;
        for (int i = 0; i < bytes.length; ++i) {
            int n = current;
            current = (byte)(current + 1);
            bytes[i] = n;
        }
        int key = 99812822;
        backend.setCurrentKey((Object)key);
        state.update((Object)bytes);
        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace((Object)key, (TypeSerializer)IntSerializer.INSTANCE, (Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE);
        long requestId = 2147666475L;
        Assert.assertTrue((boolean)registryListener.registrationName.equals("vanilla"));
        KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
        ByteBuf serRequest = MessageSerializer.serializeRequest((ByteBufAllocator)channel.alloc(), (long)requestId, (MessageBody)request);
        channel.writeInbound(new Object[]{serRequest});
        Object msg = this.readInboundBlocking(channel);
        Assert.assertTrue((String)"Not ChunkedByteBuf", (boolean)(msg instanceof ChunkedByteBuf));
        ((ChunkedByteBuf)msg).close();
    }

    private Object readInboundBlocking(EmbeddedChannel channel) throws InterruptedException, TimeoutException {
        long sleepMillis = 50L;
        Object msg = null;
        for (long sleptMillis = 0L; sleptMillis < 10000L && (msg = channel.readOutbound()) == null; sleptMillis += 50L) {
            Thread.sleep(50L);
        }
        if (msg == null) {
            throw new TimeoutException();
        }
        return msg;
    }

    private ChannelHandler getFrameDecoder() {
        return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4);
    }

    private AbstractKeyedStateBackend<Integer> createKeyedStateBackend(KvStateRegistry registry, int numKeyGroups, AbstractStateBackend abstractBackend, DummyEnvironment dummyEnv) throws IOException {
        return abstractBackend.createKeyedStateBackend((Environment)dummyEnv, dummyEnv.getJobID(), "test_op", (TypeSerializer)IntSerializer.INSTANCE, numKeyGroups, new KeyGroupRange(0, 0), registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()), TtlTimeProvider.DEFAULT, (MetricGroup)new UnregisteredMetricsGroup(), Collections.emptyList(), new CloseableRegistry());
    }

    static class TestRegistryListener
    implements KvStateRegistryListener {
        volatile JobVertexID jobVertexID;
        volatile KeyGroupRange keyGroupIndex;
        volatile String registrationName;
        volatile KvStateID kvStateId;

        TestRegistryListener() {
        }

        public void notifyKvStateRegistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) {
            this.jobVertexID = jobVertexId;
            this.keyGroupIndex = keyGroupRange;
            this.registrationName = registrationName;
            this.kvStateId = kvStateId;
        }

        public void notifyKvStateUnregistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) {
        }
    }
}

