package org.apache.phoenix.shaded.org.apache.omid.tso;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.phoenix.shaded.org.apache.omid.metrics.NullMetricsProvider;
import org.apache.phoenix.shaded.org.apache.omid.proto.TSOProto;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/phoenix/shaded/org/apache/omid/tso/TestTSOChannelHandlerNetty.class */
public class TestTSOChannelHandlerNetty {
    private static final Logger LOG = LoggerFactory.getLogger(TestTSOChannelHandlerNetty.class);

    @Mock
    private RequestProcessor requestProcessor;
    private TSOChannelHandler channelHandler;

    @BeforeMethod
    public void beforeTestMethod() {
        MockitoAnnotations.initMocks(this);
        TSOServerConfig tSOServerConfig = new TSOServerConfig();
        tSOServerConfig.setPort(1434);
        this.channelHandler = new TSOChannelHandler(tSOServerConfig, this.requestProcessor, new NullMetricsProvider());
    }

    @AfterMethod
    public void afterTestMethod() throws IOException {
        this.channelHandler.close();
    }

    @Test(timeOut = 10000)
    public void testMainAPI() throws Exception {
        Assert.assertNull(this.channelHandler.listeningChannel);
        Assert.assertNull(this.channelHandler.channelGroup);
        this.channelHandler.reconnect();
        Assert.assertTrue(this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals(this.channelHandler.channelGroup.size(), 1);
        Assert.assertEquals(((InetSocketAddress) this.channelHandler.listeningChannel.getLocalAddress()).getPort(), 1434);
        this.channelHandler.closeConnection();
        Assert.assertFalse(this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals(this.channelHandler.channelGroup.size(), 0);
        this.channelHandler.closeConnection();
        Assert.assertFalse(this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals(this.channelHandler.channelGroup.size(), 0);
        this.channelHandler.reconnect();
        Assert.assertTrue(this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals(this.channelHandler.channelGroup.size(), 1);
        this.channelHandler.reconnect();
        Assert.assertTrue(this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals(this.channelHandler.channelGroup.size(), 1);
        this.channelHandler.close();
        Assert.assertFalse(this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals(this.channelHandler.channelGroup.size(), 0);
        try {
            this.channelHandler.reconnect();
        } catch (ChannelException e) {
            Assert.assertFalse(this.channelHandler.listeningChannel.isOpen());
            Assert.assertEquals(this.channelHandler.channelGroup.size(), 0);
        }
    }

    @Test(timeOut = 10000)
    public void testNettyConnectionToTSOFromClient() throws Exception {
        ClientBootstrap createNettyClientBootstrap = createNettyClientBootstrap();
        ChannelFuture connect = createNettyClientBootstrap.connect(new InetSocketAddress("localhost", 1434));
        do {
        } while (!connect.isDone());
        Assert.assertFalse(connect.isSuccess());
        this.channelHandler.reconnect();
        Assert.assertTrue(this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals(this.channelHandler.channelGroup.size(), 1);
        ChannelFuture connect2 = createNettyClientBootstrap.connect(new InetSocketAddress("localhost", 1434));
        do {
        } while (!connect2.isDone());
        Assert.assertTrue(connect2.isSuccess());
        Assert.assertTrue(connect2.getChannel().isConnected());
        do {
        } while (this.channelHandler.channelGroup.size() != 2);
        connect2.getChannel().close().await();
        do {
        } while (this.channelHandler.channelGroup.size() != 1);
        ChannelFuture connect3 = createNettyClientBootstrap.connect(new InetSocketAddress("localhost", 1434));
        do {
        } while (!connect3.isDone());
        Assert.assertTrue(connect3.isSuccess());
        do {
        } while (this.channelHandler.channelGroup.size() != 2);
        this.channelHandler.closeConnection();
        Assert.assertFalse(this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals(this.channelHandler.channelGroup.size(), 0);
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertFalse(connect3.getChannel().isOpen());
        this.channelHandler.reconnect();
        Assert.assertTrue(this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals(this.channelHandler.channelGroup.size(), 1);
        ChannelFuture connect4 = createNettyClientBootstrap.connect(new InetSocketAddress("localhost", 1434));
        do {
        } while (!connect4.isDone());
        Assert.assertTrue(connect4.isSuccess());
        do {
        } while (this.channelHandler.channelGroup.size() != 2);
        this.channelHandler.reconnect();
        Assert.assertTrue(this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals(this.channelHandler.channelGroup.size(), 1);
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertFalse(connect4.getChannel().isOpen());
        this.channelHandler.close();
        Assert.assertFalse(this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals(this.channelHandler.channelGroup.size(), 0);
    }

    @Test(timeOut = 10000)
    public void testNettyChannelWriting() throws Exception {
        this.channelHandler.reconnect();
        ChannelFuture connect = createNettyClientBootstrap().connect(new InetSocketAddress("localhost", 1434));
        do {
        } while (!connect.isDone());
        Assert.assertTrue(connect.isSuccess());
        Assert.assertTrue(connect.getChannel().isConnected());
        Channel channel = connect.getChannel();
        do {
        } while (this.channelHandler.channelGroup.size() != 2);
        TSOProto.HandshakeRequest.Builder newBuilder = TSOProto.HandshakeRequest.newBuilder();
        newBuilder.setClientCapabilities(TSOProto.Capabilities.newBuilder().m5146build());
        connect.getChannel().write(TSOProto.Request.newBuilder().setHandshakeRequest(newBuilder.build()).build());
        testWritingTimestampRequest(channel);
        testWritingCommitRequest(channel);
        testWritingFenceRequest(channel);
    }

    private void testWritingTimestampRequest(Channel channel) throws InterruptedException {
        Mockito.reset(new RequestProcessor[]{this.requestProcessor});
        TSOProto.Request.Builder newBuilder = TSOProto.Request.newBuilder();
        newBuilder.setTimestampRequest(TSOProto.TimestampRequest.newBuilder().build());
        channel.write(newBuilder.build()).await();
        ((RequestProcessor) Mockito.verify(this.requestProcessor, Mockito.timeout(100).times(1))).timestampRequest((Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
        ((RequestProcessor) Mockito.verify(this.requestProcessor, Mockito.timeout(100).never())).commitRequest(Matchers.anyLong(), Matchers.anyCollectionOf(Long.class), Matchers.anyCollectionOf(Long.class), Matchers.anyBoolean(), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
    }

    private void testWritingCommitRequest(Channel channel) throws InterruptedException {
        Mockito.reset(new RequestProcessor[]{this.requestProcessor});
        TSOProto.Request.Builder newBuilder = TSOProto.Request.newBuilder();
        TSOProto.CommitRequest.Builder newBuilder2 = TSOProto.CommitRequest.newBuilder();
        newBuilder2.setStartTimestamp(666L);
        newBuilder2.addCellId(666L);
        newBuilder.setCommitRequest(newBuilder2.m5177build());
        Assert.assertTrue(newBuilder.build().hasCommitRequest());
        channel.write(newBuilder.build()).await();
        ((RequestProcessor) Mockito.verify(this.requestProcessor, Mockito.timeout(100).never())).timestampRequest((Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
        ((RequestProcessor) Mockito.verify(this.requestProcessor, Mockito.timeout(100).times(1))).commitRequest(Matchers.eq(666L), Matchers.anyCollectionOf(Long.class), Matchers.anyCollectionOf(Long.class), Matchers.eq(false), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
    }

    private void testWritingFenceRequest(Channel channel) throws InterruptedException {
        Mockito.reset(new RequestProcessor[]{this.requestProcessor});
        TSOProto.Request.Builder newBuilder = TSOProto.Request.newBuilder();
        TSOProto.FenceRequest.Builder newBuilder2 = TSOProto.FenceRequest.newBuilder();
        newBuilder2.setTableId(666L);
        newBuilder.setFenceRequest(newBuilder2.build());
        Assert.assertTrue(newBuilder.build().hasFenceRequest());
        channel.write(newBuilder.build()).await();
        ((RequestProcessor) Mockito.verify(this.requestProcessor, Mockito.timeout(100).never())).timestampRequest((Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
        ((RequestProcessor) Mockito.verify(this.requestProcessor, Mockito.timeout(100).times(1))).fenceRequest(Matchers.eq(666L), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
    }

    private ClientBootstrap createNettyClientBootstrap() {
        ClientBootstrap clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("client-boss-%d").build()), Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("client-worker-%d").build()), 1));
        clientBootstrap.setOption("tcpNoDelay", true);
        clientBootstrap.setOption("keepAlive", true);
        clientBootstrap.setOption("reuseAddress", true);
        clientBootstrap.setOption("connectTimeoutMillis", 100);
        ChannelPipeline pipeline = clientBootstrap.getPipeline();
        pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8192, 0, 4, 0, 4));
        pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
        pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
        pipeline.addLast("protobufencoder", new ProtobufEncoder());
        pipeline.addLast("handler", new SimpleChannelHandler() { // from class: org.apache.phoenix.shaded.org.apache.omid.tso.TestTSOChannelHandlerNetty.1
            @Override // org.jboss.netty.channel.SimpleChannelHandler
            public void channelConnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) {
                TestTSOChannelHandlerNetty.LOG.info("Channel {} connected", channelHandlerContext.getChannel());
            }

            @Override // org.jboss.netty.channel.SimpleChannelHandler
            public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) throws Exception {
                TestTSOChannelHandlerNetty.LOG.error("Error on channel {}", channelHandlerContext.getChannel(), exceptionEvent.getCause());
            }
        });
        return clientBootstrap;
    }
}
