/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.queryablestate.network;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.queryablestate.network.AbstractServerBase;
import org.apache.flink.queryablestate.network.AbstractServerHandler;
import org.apache.flink.queryablestate.network.Client;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.AtomicKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class AbstractServerTest
extends TestLogger {
    @Rule
    public ExpectedException expectedEx = ExpectedException.none();

    @Test
    public void testServerInitializationFailure() throws Throwable {
        this.expectedEx.expect(FlinkRuntimeException.class);
        this.expectedEx.expectMessage("Unable to start Test Server 2. All ports in provided range are occupied.");
        ArrayList<Integer> portList = new ArrayList<Integer>();
        portList.add(7777);
        try (TestServer server1 = new TestServer("Test Server 1", (KvStateRequestStats)new DisabledKvStateRequestStats(), portList.iterator());){
            server1.start();
            try (TestServer server2 = new TestServer("Test Server 2", (KvStateRequestStats)new DisabledKvStateRequestStats(), Collections.singletonList(server1.getServerAddress().getPort()).iterator());){
                server2.start();
            }
        }
    }

    @Test
    public void testPortRangeSuccess() throws Throwable {
        AtomicKvStateRequestStats serverStats1 = new AtomicKvStateRequestStats();
        AtomicKvStateRequestStats serverStats2 = new AtomicKvStateRequestStats();
        AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats();
        int portRangeStart = 7777;
        int portRangeEnd = 7900;
        List portList = IntStream.range(7777, 7901).boxed().collect(Collectors.toList());
        try (TestServer server1 = new TestServer("Test Server 1", (KvStateRequestStats)serverStats1, portList.iterator());
             TestServer server2 = new TestServer("Test Server 2", (KvStateRequestStats)serverStats2, portList.iterator());
             TestClient client = new TestClient("Test Client", 1, (MessageSerializer<TestMessage, TestMessage>)new MessageSerializer((MessageDeserializer)new TestMessage.TestMessageDeserializer(), (MessageDeserializer)new TestMessage.TestMessageDeserializer()), (KvStateRequestStats)clientStats);){
            server1.start();
            Assert.assertTrue((server1.getServerAddress().getPort() >= 7777 && server1.getServerAddress().getPort() <= 7900 ? 1 : 0) != 0);
            server2.start();
            Assert.assertTrue((server2.getServerAddress().getPort() >= 7777 && server2.getServerAddress().getPort() <= 7900 ? 1 : 0) != 0);
            TestMessage response1 = (TestMessage)((Object)client.sendRequest(server1.getServerAddress(), new TestMessage("ping")).join());
            Assert.assertEquals((Object)(server1.getServerName() + "-ping"), (Object)response1.getMessage());
            TestMessage response2 = (TestMessage)((Object)client.sendRequest(server2.getServerAddress(), new TestMessage("pong")).join());
            Assert.assertEquals((Object)(server2.getServerName() + "-pong"), (Object)response2.getMessage());
            Assert.assertEquals((long)1L, (long)serverStats1.getNumConnections());
            Assert.assertEquals((long)1L, (long)serverStats2.getNumConnections());
            Assert.assertEquals((long)2L, (long)clientStats.getNumConnections());
            Assert.assertEquals((long)0L, (long)clientStats.getNumFailed());
            Assert.assertEquals((long)2L, (long)clientStats.getNumSuccessful());
            Assert.assertEquals((long)2L, (long)clientStats.getNumRequests());
        }
        Assert.assertEquals((long)0L, (long)serverStats1.getNumConnections());
        Assert.assertEquals((long)0L, (long)serverStats2.getNumConnections());
        Assert.assertEquals((long)0L, (long)clientStats.getNumConnections());
        Assert.assertEquals((long)0L, (long)clientStats.getNumFailed());
        Assert.assertEquals((long)2L, (long)clientStats.getNumSuccessful());
        Assert.assertEquals((long)2L, (long)clientStats.getNumRequests());
    }

    private static class TestMessage
    extends MessageBody {
        private final String message;

        TestMessage(String message) {
            this.message = (String)Preconditions.checkNotNull((Object)message);
        }

        public String getMessage() {
            return this.message;
        }

        public byte[] serialize() {
            byte[] content = this.message.getBytes(ConfigConstants.DEFAULT_CHARSET);
            return ByteBuffer.allocate(content.length + 4).putInt(content.length).put(content).array();
        }

        public static class TestMessageDeserializer
        implements MessageDeserializer<TestMessage> {
            public TestMessage deserializeMessage(ByteBuf buf) {
                int length = buf.readInt();
                String message = "";
                if (length > 0) {
                    byte[] name = new byte[length];
                    buf.readBytes(name);
                    message = new String(name, ConfigConstants.DEFAULT_CHARSET);
                }
                return new TestMessage(message);
            }
        }
    }

    private static class TestServer
    extends AbstractServerBase<TestMessage, TestMessage>
    implements AutoCloseable {
        private final KvStateRequestStats requestStats;

        TestServer(String name, KvStateRequestStats stats, Iterator<Integer> bindPort) throws UnknownHostException {
            super(name, InetAddress.getLocalHost(), bindPort, Integer.valueOf(1), Integer.valueOf(1));
            this.requestStats = stats;
        }

        public AbstractServerHandler<TestMessage, TestMessage> initializeHandler() {
            return new AbstractServerHandler<TestMessage, TestMessage>((AbstractServerBase)this, new MessageSerializer((MessageDeserializer)new TestMessage.TestMessageDeserializer(), (MessageDeserializer)new TestMessage.TestMessageDeserializer()), this.requestStats){

                public CompletableFuture<TestMessage> handleRequest(long requestId, TestMessage request) {
                    TestMessage response = new TestMessage(this.getServerName() + '-' + request.getMessage());
                    return CompletableFuture.completedFuture(response);
                }

                public CompletableFuture<Void> shutdown() {
                    return CompletableFuture.completedFuture(null);
                }
            };
        }

        @Override
        public void close() throws Exception {
            this.shutdownServer().get();
            Assert.assertTrue((boolean)this.getQueryExecutor().isTerminated());
            Assert.assertTrue((boolean)this.isEventGroupShutdown());
        }
    }

    private static class TestClient
    extends Client<TestMessage, TestMessage>
    implements AutoCloseable {
        TestClient(String clientName, int numEventLoopThreads, MessageSerializer<TestMessage, TestMessage> serializer, KvStateRequestStats stats) {
            super(clientName, numEventLoopThreads, serializer, stats);
        }

        @Override
        public void close() throws Exception {
            this.shutdown().join();
            Assert.assertTrue((boolean)this.isEventGroupShutdown());
        }
    }
}

