package org.apache.flink.queryablestate.network;

import java.nio.channels.ClosedChannelException;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/queryablestate/network/KvStateClientHandlerTest.class */
class KvStateClientHandlerTest {

    /* loaded from: input_file:org/apache/flink/queryablestate/network/KvStateClientHandlerTest$TestingClientHandlerCallback.class */
    private static class TestingClientHandlerCallback implements ClientHandlerCallback {
        private int onRequestCnt;
        private long onRequestId;
        private MessageBody onRequestBody;
        private int onRequestFailureCnt;
        private long onRequestFailureId;
        private Throwable onRequestFailureBody;
        private int onFailureCnt;
        private Throwable onFailureBody;

        private TestingClientHandlerCallback() {
        }

        public void onRequestResult(long j, MessageBody messageBody) {
            this.onRequestCnt++;
            this.onRequestId = j;
            this.onRequestBody = messageBody;
        }

        public void onRequestFailure(long j, Throwable th) {
            this.onRequestFailureCnt++;
            this.onRequestFailureId = j;
            this.onRequestFailureBody = th;
        }

        public void onFailure(Throwable th) {
            this.onFailureCnt++;
            this.onFailureBody = th;
        }

        public void reset() {
            this.onRequestCnt = 0;
            this.onRequestId = -1L;
            this.onRequestBody = null;
            this.onRequestFailureCnt = 0;
            this.onRequestFailureId = -1L;
            this.onRequestFailureBody = null;
            this.onFailureCnt = 0;
            this.onFailureBody = null;
        }
    }

    KvStateClientHandlerTest() {
    }

    @Test
    void testReadCallbacksAndBufferRecycling() throws Exception {
        TestingClientHandlerCallback testingClientHandlerCallback = new TestingClientHandlerCallback();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{new ClientHandler("Test Client", new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()), testingClientHandlerCallback)});
        ByteBuf serializeResponse = MessageSerializer.serializeResponse(embeddedChannel.alloc(), 1222112277L, new KvStateResponse(new byte[0]));
        serializeResponse.skipBytes(4);
        testingClientHandlerCallback.reset();
        embeddedChannel.writeInbound(new Object[]{serializeResponse});
        Assertions.assertThat(testingClientHandlerCallback.onRequestCnt).isEqualTo(1);
        Assertions.assertThat(testingClientHandlerCallback.onRequestId).isEqualTo(1222112277L);
        Assertions.assertThat(testingClientHandlerCallback.onRequestBody).isInstanceOf(KvStateResponse.class);
        Assertions.assertThat(serializeResponse.refCnt()).isEqualTo(0).withFailMessage("Buffer not recycled", new Object[0]);
        ByteBuf serializeRequestFailure = MessageSerializer.serializeRequestFailure(embeddedChannel.alloc(), 1222112278L, new RuntimeException("Expected test Exception"));
        serializeRequestFailure.skipBytes(4);
        testingClientHandlerCallback.reset();
        embeddedChannel.writeInbound(new Object[]{serializeRequestFailure});
        Assertions.assertThat(testingClientHandlerCallback.onRequestFailureCnt).isEqualTo(1);
        Assertions.assertThat(testingClientHandlerCallback.onRequestFailureId).isEqualTo(1222112278L);
        Assertions.assertThat(testingClientHandlerCallback.onRequestFailureBody).isInstanceOf(RuntimeException.class);
        Assertions.assertThat(serializeRequestFailure.refCnt()).isEqualTo(0).withFailMessage("Buffer not recycled", new Object[0]);
        ByteBuf serializeServerFailure = MessageSerializer.serializeServerFailure(embeddedChannel.alloc(), new RuntimeException("Expected test Exception"));
        serializeServerFailure.skipBytes(4);
        testingClientHandlerCallback.reset();
        embeddedChannel.writeInbound(new Object[]{serializeServerFailure});
        Assertions.assertThat(testingClientHandlerCallback.onFailureCnt).isEqualTo(1);
        Assertions.assertThat(testingClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class);
        ByteBuf writeInt = embeddedChannel.alloc().buffer(4).writeInt(1223823);
        testingClientHandlerCallback.reset();
        embeddedChannel.writeInbound(new Object[]{writeInt});
        Assertions.assertThat(testingClientHandlerCallback.onFailureCnt).isEqualTo(1);
        Assertions.assertThat(testingClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class);
        Assertions.assertThat(writeInt.refCnt()).isEqualTo(0).withFailMessage("Buffer not recycled", new Object[0]);
        testingClientHandlerCallback.reset();
        embeddedChannel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception"));
        Assertions.assertThat(testingClientHandlerCallback.onFailureCnt).isEqualTo(1);
        Assertions.assertThat(testingClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class);
        testingClientHandlerCallback.reset();
        embeddedChannel.pipeline().fireChannelInactive();
        Assertions.assertThat(testingClientHandlerCallback.onFailureCnt).isEqualTo(1);
        Assertions.assertThat(testingClientHandlerCallback.onFailureBody).isInstanceOf(ClosedChannelException.class);
    }
}
