package org.apache.flink.queryablestate.network;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.AtomicKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/queryablestate/network/AbstractServerTest.class */
class AbstractServerTest {

    /* loaded from: input_file:org/apache/flink/queryablestate/network/AbstractServerTest$TestClient.class */
    private static class TestClient extends Client<TestMessage, TestMessage> implements AutoCloseable {
        TestClient(String str, int i, MessageSerializer<TestMessage, TestMessage> messageSerializer, KvStateRequestStats kvStateRequestStats) {
            super(str, i, messageSerializer, kvStateRequestStats);
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            shutdown().join();
            Assertions.assertThat(isEventGroupShutdown()).isTrue();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/queryablestate/network/AbstractServerTest$TestMessage.class */
    public static class TestMessage extends MessageBody {
        private final String message;

        /* loaded from: input_file:org/apache/flink/queryablestate/network/AbstractServerTest$TestMessage$TestMessageDeserializer.class */
        public static class TestMessageDeserializer implements MessageDeserializer<TestMessage> {
            /* renamed from: deserializeMessage, reason: merged with bridge method [inline-methods] */
            public TestMessage m7deserializeMessage(ByteBuf byteBuf) {
                int readInt = byteBuf.readInt();
                String str = "";
                if (readInt > 0) {
                    byte[] bArr = new byte[readInt];
                    byteBuf.readBytes(bArr);
                    str = new String(bArr, ConfigConstants.DEFAULT_CHARSET);
                }
                return new TestMessage(str);
            }
        }

        TestMessage(String str) {
            this.message = (String) Preconditions.checkNotNull(str);
        }

        public String getMessage() {
            return this.message;
        }

        public byte[] serialize() {
            byte[] bytes = this.message.getBytes(ConfigConstants.DEFAULT_CHARSET);
            return ByteBuffer.allocate(bytes.length + 4).putInt(bytes.length).put(bytes).array();
        }
    }

    /* loaded from: input_file:org/apache/flink/queryablestate/network/AbstractServerTest$TestServer.class */
    private static class TestServer extends AbstractServerBase<TestMessage, TestMessage> implements AutoCloseable {
        private final KvStateRequestStats requestStats;

        TestServer(String str, KvStateRequestStats kvStateRequestStats, Iterator<Integer> it) throws UnknownHostException {
            super(str, InetAddress.getLocalHost().getHostName(), it, 1, 1);
            this.requestStats = kvStateRequestStats;
        }

        public AbstractServerHandler<TestMessage, TestMessage> initializeHandler() {
            return new AbstractServerHandler<TestMessage, TestMessage>(this, new MessageSerializer(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()), this.requestStats) { // from class: org.apache.flink.queryablestate.network.AbstractServerTest.TestServer.1
                public CompletableFuture<TestMessage> handleRequest(long j, TestMessage testMessage) {
                    return CompletableFuture.completedFuture(new TestMessage(getServerName() + '-' + testMessage.getMessage()));
                }

                public CompletableFuture<Void> shutdown() {
                    return CompletableFuture.completedFuture(null);
                }
            };
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            shutdownServer().get();
            Assertions.assertThat(getQueryExecutor().isTerminated()).isTrue();
            Assertions.assertThat(isEventGroupShutdown()).isTrue();
        }
    }

    AbstractServerTest() {
    }

    @Test
    void testServerInitializationFailure() throws Throwable {
        TestServer testServer = new TestServer("Test Server 1", new DisabledKvStateRequestStats(), Collections.singletonList(0).iterator());
        Throwable th = null;
        try {
            testServer.start();
            TestServer testServer2 = new TestServer("Test Server 2", new DisabledKvStateRequestStats(), Collections.singletonList(Integer.valueOf(testServer.getServerAddress().getPort())).iterator());
            Throwable th2 = null;
            try {
                try {
                    Assertions.assertThatThrownBy(() -> {
                        testServer2.start();
                    }).hasMessage("Unable to start Test Server 2. All ports in provided range are occupied.");
                    if (testServer2 != null) {
                        if (0 != 0) {
                            try {
                                testServer2.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            testServer2.close();
                        }
                    }
                    if (testServer != null) {
                        if (0 == 0) {
                            testServer.close();
                            return;
                        }
                        try {
                            testServer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (testServer2 != null) {
                    if (th2 != null) {
                        try {
                            testServer2.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        testServer2.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (testServer != null) {
                if (0 != 0) {
                    try {
                        testServer.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    testServer.close();
                }
            }
            throw th8;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    void testPortRangeSuccess() throws Throwable {
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        AtomicKvStateRequestStats atomicKvStateRequestStats2 = new AtomicKvStateRequestStats();
        AtomicKvStateRequestStats atomicKvStateRequestStats3 = new AtomicKvStateRequestStats();
        List list = (List) IntStream.range(7777, 7901).boxed().collect(Collectors.toList());
        TestServer testServer = new TestServer("Test Server 1", atomicKvStateRequestStats, list.iterator());
        Throwable th = null;
        try {
            TestServer testServer2 = new TestServer("Test Server 2", atomicKvStateRequestStats2, list.iterator());
            Throwable th2 = null;
            try {
                TestClient testClient = new TestClient("Test Client", 1, new MessageSerializer(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()), atomicKvStateRequestStats3);
                Throwable th3 = null;
                try {
                    try {
                        testServer.start();
                        Assertions.assertThat(testServer.getServerAddress().getPort()).isGreaterThanOrEqualTo(7777);
                        Assertions.assertThat(testServer.getServerAddress().getPort()).isLessThanOrEqualTo(7900);
                        testServer2.start();
                        Assertions.assertThat(testServer2.getServerAddress().getPort()).isGreaterThanOrEqualTo(7777);
                        Assertions.assertThat(testServer2.getServerAddress().getPort()).isLessThanOrEqualTo(7900);
                        Assertions.assertThat(((TestMessage) testClient.sendRequest(testServer.getServerAddress(), new TestMessage("ping")).join()).getMessage()).isEqualTo(testServer.getServerName() + "-ping");
                        Assertions.assertThat(((TestMessage) testClient.sendRequest(testServer2.getServerAddress(), new TestMessage("pong")).join()).getMessage()).isEqualTo(testServer2.getServerName() + "-pong");
                        Assertions.assertThat(atomicKvStateRequestStats.getNumConnections()).isEqualTo(1L);
                        Assertions.assertThat(atomicKvStateRequestStats2.getNumConnections()).isEqualTo(1L);
                        Assertions.assertThat(atomicKvStateRequestStats3.getNumConnections()).isEqualTo(2L);
                        Assertions.assertThat(atomicKvStateRequestStats3.getNumFailed()).isEqualTo(0L);
                        Assertions.assertThat(atomicKvStateRequestStats3.getNumSuccessful()).isEqualTo(2L);
                        Assertions.assertThat(atomicKvStateRequestStats3.getNumRequests()).isEqualTo(2L);
                        if (testClient != null) {
                            if (0 != 0) {
                                try {
                                    testClient.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                testClient.close();
                            }
                        }
                        if (testServer2 != null) {
                            if (0 != 0) {
                                try {
                                    testServer2.close();
                                } catch (Throwable th5) {
                                    th2.addSuppressed(th5);
                                }
                            } else {
                                testServer2.close();
                            }
                        }
                        Assertions.assertThat(atomicKvStateRequestStats.getNumConnections()).isEqualTo(0L);
                        Assertions.assertThat(atomicKvStateRequestStats2.getNumConnections()).isEqualTo(0L);
                        Assertions.assertThat(atomicKvStateRequestStats3.getNumConnections()).isEqualTo(0L);
                        Assertions.assertThat(atomicKvStateRequestStats3.getNumFailed()).isEqualTo(0L);
                        Assertions.assertThat(atomicKvStateRequestStats3.getNumSuccessful()).isEqualTo(2L);
                        Assertions.assertThat(atomicKvStateRequestStats3.getNumRequests()).isEqualTo(2L);
                    } finally {
                    }
                } catch (Throwable th6) {
                    if (testClient != null) {
                        if (th3 != null) {
                            try {
                                testClient.close();
                            } catch (Throwable th7) {
                                th3.addSuppressed(th7);
                            }
                        } else {
                            testClient.close();
                        }
                    }
                    throw th6;
                }
            } catch (Throwable th8) {
                if (testServer2 != null) {
                    if (0 != 0) {
                        try {
                            testServer2.close();
                        } catch (Throwable th9) {
                            th2.addSuppressed(th9);
                        }
                    } else {
                        testServer2.close();
                    }
                }
                throw th8;
            }
        } finally {
            if (testServer != null) {
                if (0 != 0) {
                    try {
                        testServer.close();
                    } catch (Throwable th10) {
                        th.addSuppressed(th10);
                    }
                } else {
                    testServer.close();
                }
            }
        }
    }
}
