/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.cluster.messaging.impl;

import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import com.sun.security.auth.module.UnixSystem;
import io.atomix.cluster.messaging.ManagedMessagingService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.MessagingException;
import io.atomix.cluster.messaging.impl.ChannelPool;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.cluster.messaging.impl.ProtocolVersion;
import io.atomix.utils.Managed;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.test.util.junit.AutoCloseResources;
import io.camunda.zeebe.test.util.junit.RegressionTest;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import io.netty.channel.Channel;
import java.io.IOException;
import java.net.ConnectException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.agrona.collections.MutableReference;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowableAssertAlternative;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;

@AutoCloseResources
final class NettyMessagingServiceTest {
    private static final String CLUSTER_NAME = "zeebe";
    private static final int UID_COLUMN = 7;

    NettyMessagingServiceTest() {
    }

    private MessagingConfig defaultConfig() {
        return new MessagingConfig().setShutdownQuietPeriod(Duration.ofMillis(50L));
    }

    private String nextSubject() {
        return UUID.randomUUID().toString();
    }

    private ManagedMessagingService createMessagingServiceWithPool(MutableReference<ChannelPool> poolRef) {
        Address otherAddress = this.newAddress();
        MessagingConfig config = this.defaultConfig();
        return new NettyMessagingService(CLUSTER_NAME, otherAddress, config, ProtocolVersion.V2, factory -> {
            ChannelPool pool = new ChannelPool(factory, 8);
            poolRef.set((Object)pool);
            return pool;
        });
    }

    private NettyMessagingService newMessagingService() {
        return new NettyMessagingService(CLUSTER_NAME, this.newAddress(), this.defaultConfig());
    }

    private Address newAddress() {
        return Address.from((int)SocketUtil.getNextAddress().getPort());
    }

    private void startMessagingServices(NettyMessagingService ... services) {
        CompletableFuture.allOf((CompletableFuture[])((Stream)Stream.of(services).parallel()).map(Managed::start).toArray(CompletableFuture[]::new)).join();
    }

    private long udpSocketCount() throws IOException {
        long pid = ProcessHandle.current().pid();
        Path udpPath = Path.of("/proc/" + pid + "/net/udp", new String[0]);
        Path udp6Path = Path.of("/proc/" + pid + "/net/udp6", new String[0]);
        return this.udpSocketCount(udpPath) + this.udpSocketCount(udp6Path);
    }

    private long udpSocketCount(Path path) throws IOException {
        List<String> lines = Files.readAllLines(path);
        long uid = new UnixSystem().getUid();
        lines.remove(0);
        return lines.stream().filter(line -> {
            String[] columns = line.trim().split("\\s+");
            return Long.parseLong(columns[7]) == uid;
        }).count();
    }

    @Nested
    final class DualInstanceTest {
        @AutoCloseResources.AutoCloseResource
        private final NettyMessagingService netty1;
        @AutoCloseResources.AutoCloseResource
        private final NettyMessagingService netty2;

        DualInstanceTest() {
            this.netty1 = NettyMessagingServiceTest.this.newMessagingService();
            this.netty2 = NettyMessagingServiceTest.this.newMessagingService();
        }

        @BeforeEach
        void beforeEach() {
            NettyMessagingServiceTest.this.startMessagingServices(this.netty1, this.netty2);
        }

        @Test
        void testSendAndReceive() {
            String subject = NettyMessagingServiceTest.this.nextSubject();
            AtomicBoolean handlerInvoked = new AtomicBoolean(false);
            AtomicReference request = new AtomicReference();
            AtomicReference sender = new AtomicReference();
            BiFunction<Address, byte[], byte[]> handler = (ep, data) -> {
                handlerInvoked.set(true);
                sender.set(ep);
                request.set(data);
                return "hello there".getBytes();
            };
            this.netty2.registerHandler(subject, handler, MoreExecutors.directExecutor());
            CompletableFuture response = this.netty1.sendAndReceive(this.netty2.address(), subject, "hello world".getBytes());
            Assertions.assertThat((byte[])"hello there".getBytes()).isEqualTo(response.join());
            Assertions.assertThat((boolean)handlerInvoked.get()).isTrue();
            Assertions.assertThat((byte[])((byte[])request.get())).asString().isEqualTo("hello world");
            Assertions.assertThat((boolean)Arrays.equals((byte[])request.get(), "hello world".getBytes())).isTrue();
            Assertions.assertThat((Object)((Address)sender.get()).tryResolveAddress()).isEqualTo((Object)this.netty1.address().tryResolveAddress());
        }

        @Test
        void testSendAsync() {
            Address invalidAddress = Address.from((String)"127.0.0.1", (int)5007);
            String subject = NettyMessagingServiceTest.this.nextSubject();
            CountDownLatch latch1 = new CountDownLatch(1);
            CompletableFuture response = this.netty1.sendAsync(this.netty2.address(), subject, "hello world".getBytes());
            response.whenComplete((r, e) -> {
                Assertions.assertThat((Throwable)e).isNull();
                latch1.countDown();
            });
            Uninterruptibles.awaitUninterruptibly((CountDownLatch)latch1);
            CountDownLatch latch2 = new CountDownLatch(1);
            response = this.netty1.sendAsync(invalidAddress, subject, "hello world".getBytes());
            response.whenComplete((r, e) -> {
                Assertions.assertThat((Throwable)e).isInstanceOf(ConnectException.class);
                latch2.countDown();
            });
            Uninterruptibles.awaitUninterruptibly((CountDownLatch)latch2);
        }

        @Test
        void shouldCompleteExistingRequestFutureExceptionallyWhenMessagingServiceIsClosed() {
            String subject = NettyMessagingServiceTest.this.nextSubject();
            CompletableFuture response = this.netty1.sendAndReceive(this.netty2.address(), subject, "hello world".getBytes(), true, Duration.ofSeconds(5L));
            this.netty1.stop().join();
            Assertions.assertThat((CompletableFuture)response).isCompletedExceptionally();
        }

        @Test
        public void shouldCompleteExistingRequestWithKeepAliveExceptionallyWhenMessagingServiceIsClosed() {
            String subject = NettyMessagingServiceTest.this.nextSubject();
            CompletableFuture response = this.netty1.sendAndReceive(this.netty2.address(), subject, "hello world".getBytes(), true, Duration.ofSeconds(5L));
            this.netty1.stop().join();
            Assertions.assertThat((CompletableFuture)response).isCompletedExceptionally();
        }

        @Test
        void shouldCompleteFutureExceptionallyIfMessagingServiceIsClosedInBetween() {
            String subject = NettyMessagingServiceTest.this.nextSubject();
            CompletableFuture stopFuture = this.netty1.stop();
            CompletableFuture response = this.netty1.sendAndReceive(this.netty2.address(), subject, "hello world".getBytes(), false, Duration.ofSeconds(5L));
            stopFuture.join();
            Assertions.assertThat((CompletableFuture)response).isCompletedExceptionally();
        }

        @Test
        void shouldCompleteRequestWithKeepAliveExceptionallyIfMessagingServiceIsClosedInBetween() {
            String subject = NettyMessagingServiceTest.this.nextSubject();
            CompletableFuture stopFuture = this.netty1.stop();
            CompletableFuture response = this.netty1.sendAndReceive(this.netty2.address(), subject, "hello world".getBytes(), true, Duration.ofSeconds(5L));
            stopFuture.join();
            Assertions.assertThat((CompletableFuture)response).isCompletedExceptionally();
        }

        @Test
        void shouldCompleteFutureExceptionallyIfMessagingServiceHasAlreadyClosed() {
            String subject = NettyMessagingServiceTest.this.nextSubject();
            this.netty1.stop().join();
            CompletableFuture response = this.netty1.sendAndReceive(this.netty2.address(), subject, "hello world".getBytes(), false, Duration.ofSeconds(5L));
            Assertions.assertThat((CompletableFuture)response).isCompletedExceptionally();
        }

        @Test
        void shouldCompleteRequestWithKeepAliveExceptionallyIfMessagingServiceHasAlreadyClosed() {
            String subject = NettyMessagingServiceTest.this.nextSubject();
            this.netty1.stop().join();
            CompletableFuture response = this.netty1.sendAndReceive(this.netty2.address(), subject, "hello world".getBytes(), true, Duration.ofSeconds(5L));
            Assertions.assertThat((CompletableFuture)response).isCompletedExceptionally();
        }

        @Test
        void testTransientSendAndReceive() {
            String subject = NettyMessagingServiceTest.this.nextSubject();
            AtomicBoolean handlerInvoked = new AtomicBoolean(false);
            AtomicReference request = new AtomicReference();
            AtomicReference sender = new AtomicReference();
            BiFunction<Address, byte[], byte[]> handler = (ep, data) -> {
                handlerInvoked.set(true);
                sender.set(ep);
                request.set(data);
                return "hello there".getBytes();
            };
            this.netty2.registerHandler(subject, handler, MoreExecutors.directExecutor());
            CompletableFuture response = this.netty1.sendAndReceive(this.netty2.address(), subject, "hello world".getBytes(), false);
            Assertions.assertThat((byte[])"hello there".getBytes()).isEqualTo(response.join());
            Assertions.assertThat((boolean)handlerInvoked.get()).isTrue();
            Assertions.assertThat((byte[])((byte[])request.get())).asString().isEqualTo("hello world");
            Assertions.assertThat((Object)((Address)sender.get()).tryResolveAddress()).isEqualTo((Object)this.netty1.address().tryResolveAddress());
        }

        @Test
        void testSendAndReceiveWithFixedTimeout() {
            String subject = NettyMessagingServiceTest.this.nextSubject();
            BiFunction<Address, byte[], CompletableFuture> handler = (ep, payload) -> new CompletableFuture();
            this.netty2.registerHandler(subject, handler);
            try {
                this.netty1.sendAndReceive(this.netty2.address(), subject, "hello world".getBytes(), Duration.ofSeconds(1L)).join();
                org.junit.jupiter.api.Assertions.fail();
            }
            catch (CompletionException e) {
                Assertions.assertThat((Throwable)e.getCause()).isInstanceOf(TimeoutException.class);
            }
        }

        @Test
        @Disabled
        void testSendAndReceiveWithExecutor() {
            String subject = NettyMessagingServiceTest.this.nextSubject();
            ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
            ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
            AtomicReference handlerThreadName = new AtomicReference();
            AtomicReference completionThreadName = new AtomicReference();
            CountDownLatch latch = new CountDownLatch(1);
            BiFunction<Address, byte[], byte[]> handler = (ep, data) -> {
                handlerThreadName.set(Thread.currentThread().getName());
                try {
                    latch.await();
                }
                catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                    org.junit.jupiter.api.Assertions.fail((String)"InterruptedException");
                }
                return "hello there".getBytes();
            };
            this.netty2.registerHandler(subject, handler, (Executor)handlerExecutor);
            CompletableFuture response = this.netty1.sendAndReceive(this.netty2.address(), subject, "hello world".getBytes(), (Executor)completionExecutor);
            response.whenComplete((r, e) -> completionThreadName.set(Thread.currentThread().getName()));
            latch.countDown();
            Assertions.assertThat((byte[])"hello there".getBytes()).isEqualTo(response.join());
            Assertions.assertThat((String)((String)completionThreadName.get())).isEqualTo("completion-thread");
            Assertions.assertThat((String)((String)handlerThreadName.get())).isEqualTo("handler-thread");
        }

        @Test
        void testNoRemoteHandlerException() {
            String subject = NettyMessagingServiceTest.this.nextSubject();
            MessagingException.NoRemoteHandler expectedException = new MessagingException.NoRemoteHandler(subject);
            CompletableFuture response = this.netty1.sendAndReceive(this.netty2.address(), subject, "fail".getBytes());
            ((ThrowableAssertAlternative)Assertions.assertThat((CompletableFuture)response).failsWithin(Duration.ofSeconds(5L)).withThrowableOfType(ExecutionException.class).havingRootCause().isInstanceOf(MessagingException.NoRemoteHandler.class)).withMessage(expectedException.getMessage());
        }

        @Test
        void testNoRemoteHandlerExceptionEmptyStringValue() {
            String subject = "";
            MessagingException.NoRemoteHandler expectedException = new MessagingException.NoRemoteHandler(null);
            CompletableFuture response = this.netty1.sendAndReceive(this.netty2.address(), "", "fail".getBytes());
            ((ThrowableAssertAlternative)Assertions.assertThat((CompletableFuture)response).failsWithin(Duration.ofSeconds(5L)).withThrowableOfType(ExecutionException.class).havingRootCause().isInstanceOf(MessagingException.NoRemoteHandler.class)).withMessage(expectedException.getMessage());
        }

        @Test
        void testRemoteHandlerFailure() {
            String exceptionMessage = "foo bar";
            MessagingException.RemoteHandlerFailure expectedException = new MessagingException.RemoteHandlerFailure("foo bar");
            BiFunction<Address, byte[], byte[]> handler = (ep, data) -> {
                throw new RuntimeException("foo bar");
            };
            String subject = NettyMessagingServiceTest.this.nextSubject();
            this.netty2.registerHandler(subject, handler, MoreExecutors.directExecutor());
            CompletableFuture response = this.netty1.sendAndReceive(this.netty2.address(), subject, "fail".getBytes());
            ((ThrowableAssertAlternative)Assertions.assertThat((CompletableFuture)response).failsWithin(Duration.ofSeconds(5L)).withThrowableOfType(ExecutionException.class).havingRootCause().isInstanceOf(MessagingException.RemoteHandlerFailure.class)).withMessage(expectedException.getMessage());
        }

        @Test
        void testRemoteHandlerFailureNullValue() {
            MessagingException.RemoteHandlerFailure expectedException = new MessagingException.RemoteHandlerFailure(null);
            BiFunction<Address, byte[], byte[]> handler = (ep, data) -> {
                throw new RuntimeException();
            };
            String subject = NettyMessagingServiceTest.this.nextSubject();
            this.netty2.registerHandler(subject, handler, MoreExecutors.directExecutor());
            CompletableFuture response = this.netty1.sendAndReceive(this.netty2.address(), subject, "fail".getBytes());
            ((ThrowableAssertAlternative)Assertions.assertThat((CompletableFuture)response).failsWithin(Duration.ofSeconds(5L)).withThrowableOfType(ExecutionException.class).havingRootCause().isInstanceOf(MessagingException.RemoteHandlerFailure.class)).withMessage(expectedException.getMessage());
        }

        @Test
        void testRemoteHandlerFailureEmptyStringValue() {
            String exceptionMessage = "";
            MessagingException.RemoteHandlerFailure expectedException = new MessagingException.RemoteHandlerFailure(null);
            BiFunction<Address, byte[], byte[]> handler = (ep, data) -> {
                throw new RuntimeException("");
            };
            String subject = NettyMessagingServiceTest.this.nextSubject();
            this.netty2.registerHandler(subject, handler, MoreExecutors.directExecutor());
            CompletableFuture response = this.netty1.sendAndReceive(this.netty2.address(), subject, "fail".getBytes());
            ((ThrowableAssertAlternative)Assertions.assertThat((CompletableFuture)response).failsWithin(Duration.ofSeconds(5L)).withThrowableOfType(ExecutionException.class).havingRootCause().isInstanceOf(MessagingException.RemoteHandlerFailure.class)).withMessage(expectedException.getMessage());
        }

        @Test
        void testCompletableRemoteHandlerFailure() {
            String exceptionMessage = "foo bar";
            MessagingException.RemoteHandlerFailure expectedException = new MessagingException.RemoteHandlerFailure("foo bar");
            String subject = NettyMessagingServiceTest.this.nextSubject();
            this.netty2.registerHandler(subject, (address, bytes) -> CompletableFuture.failedFuture(new RuntimeException("foo bar")));
            CompletableFuture response = this.netty1.sendAndReceive(this.netty2.address(), subject, "fail".getBytes());
            ((ThrowableAssertAlternative)Assertions.assertThat((CompletableFuture)response).failsWithin(Duration.ofSeconds(5L)).withThrowableOfType(ExecutionException.class).havingRootCause().isInstanceOf(MessagingException.RemoteHandlerFailure.class)).withMessage(expectedException.getMessage());
        }

        @Test
        void testCompletableRemoteHandlerFailureNullValue() {
            MessagingException.RemoteHandlerFailure expectedException = new MessagingException.RemoteHandlerFailure(null);
            String subject = NettyMessagingServiceTest.this.nextSubject();
            this.netty2.registerHandler(subject, (address, bytes) -> CompletableFuture.failedFuture(new RuntimeException()));
            CompletableFuture response = this.netty1.sendAndReceive(this.netty2.address(), subject, "fail".getBytes());
            ((ThrowableAssertAlternative)Assertions.assertThat((CompletableFuture)response).failsWithin(Duration.ofSeconds(5L)).withThrowableOfType(ExecutionException.class).havingRootCause().isInstanceOf(MessagingException.RemoteHandlerFailure.class)).withMessage(expectedException.getMessage());
        }

        @Test
        void testCompletableRemoteHandlerFailureEmptyStringValue() {
            String exceptionMessage = "";
            MessagingException.RemoteHandlerFailure expectedException = new MessagingException.RemoteHandlerFailure(null);
            String subject = NettyMessagingServiceTest.this.nextSubject();
            this.netty2.registerHandler(subject, (address, bytes) -> CompletableFuture.failedFuture(new RuntimeException("")));
            CompletableFuture response = this.netty1.sendAndReceive(this.netty2.address(), subject, "fail".getBytes());
            ((ThrowableAssertAlternative)Assertions.assertThat((CompletableFuture)response).failsWithin(Duration.ofSeconds(5L)).withThrowableOfType(ExecutionException.class).havingRootCause().isInstanceOf(MessagingException.RemoteHandlerFailure.class)).withMessage(expectedException.getMessage());
        }

        @Test
        void shouldCloseChannelAfterTimeout() throws Exception {
            String subject = NettyMessagingServiceTest.this.nextSubject();
            MutableReference poolRef = new MutableReference();
            Duration timeoutOnCreate = Duration.ofSeconds(10L);
            try (ManagedMessagingService nettyWithOwnPool = NettyMessagingServiceTest.this.createMessagingServiceWithPool((MutableReference<ChannelPool>)poolRef);){
                ChannelPool channelPool = (ChannelPool)poolRef.get();
                nettyWithOwnPool.start().join();
                this.netty2.registerHandler(subject, (address, bytes) -> new byte[0], Runnable::run);
                nettyWithOwnPool.sendAndReceive(this.netty2.address(), subject, "get channel".getBytes(), true, timeoutOnCreate).join();
                Channel originalChannel = (Channel)channelPool.getChannel(this.netty2.address(), subject).join();
                this.netty2.unregisterHandler(subject);
                this.netty2.registerHandler(subject, (address, bytes) -> new CompletableFuture());
                CompletableFuture response = nettyWithOwnPool.sendAndReceive(this.netty2.address(), subject, "fail".getBytes(), true, Duration.ofSeconds(1L));
                ((ThrowableAssertAlternative)Assertions.assertThat((CompletableFuture)response).failsWithin(Duration.ofSeconds(15L)).withThrowableThat().havingRootCause().isInstanceOf(TimeoutException.class)).withMessageContaining("timed out in");
                Assertions.assertThat((Future)originalChannel.closeFuture()).succeedsWithin(Duration.ofSeconds(15L));
            }
        }

        @Test
        void shouldCreateNewChannelOnNewRequestAfterTimeout() throws Exception {
            String subject = NettyMessagingServiceTest.this.nextSubject();
            MutableReference poolRef = new MutableReference();
            Duration timeoutOnCreate = Duration.ofSeconds(10L);
            try (ManagedMessagingService nettyWithOwnPool = NettyMessagingServiceTest.this.createMessagingServiceWithPool((MutableReference<ChannelPool>)poolRef);){
                ChannelPool channelPool = (ChannelPool)poolRef.get();
                nettyWithOwnPool.start().join();
                this.netty2.registerHandler(subject, (address, bytes) -> new byte[0], Runnable::run);
                nettyWithOwnPool.sendAndReceive(this.netty2.address(), subject, "get channel".getBytes(), true, timeoutOnCreate).join();
                Channel originalChannel = (Channel)channelPool.getChannel(this.netty2.address(), subject).join();
                this.netty2.unregisterHandler(subject);
                this.netty2.registerHandler(subject, (address, bytes) -> new CompletableFuture());
                CompletableFuture response = nettyWithOwnPool.sendAndReceive(this.netty2.address(), subject, "fail".getBytes(), true, Duration.ofSeconds(1L));
                Assertions.assertThat((CompletableFuture)response).failsWithin(Duration.ofSeconds(15L)).withThrowableThat().havingRootCause().isInstanceOf(TimeoutException.class);
                Assertions.assertThat((Future)originalChannel.closeFuture()).succeedsWithin(Duration.ofSeconds(15L));
                this.netty2.unregisterHandler(subject);
                this.netty2.registerHandler(subject, (address, bytes) -> new byte[0], Runnable::run);
                nettyWithOwnPool.sendAndReceive(this.netty2.address(), subject, "success".getBytes(), true, timeoutOnCreate);
                Channel newChannel = (Channel)channelPool.getChannel(this.netty2.address(), subject).join();
                Assertions.assertThat((Comparable)newChannel).isNotEqualTo((Object)originalChannel);
            }
        }

        @EnabledOnOs(value={OS.LINUX})
        @RegressionTest(value="https://github.com/camunda/camunda/issues/14837")
        void shouldNotLeakUdpSockets() throws IOException {
            int maxConnections = Runtime.getRuntime().availableProcessors() * 2;
            String subject = NettyMessagingServiceTest.this.nextSubject();
            this.netty2.registerHandler(subject, (address, bytes) -> new byte[0], MoreExecutors.directExecutor());
            for (int i = 0; i < maxConnections * 4; ++i) {
                this.netty1.sendAndReceive(this.netty2.address(), subject, "hello world".getBytes(), false).join();
            }
            Assertions.assertThat((long)NettyMessagingServiceTest.this.udpSocketCount()).isLessThanOrEqualTo((long)maxConnections * 2L);
        }
    }

    @Nested
    final class SingleInstanceTest {
        SingleInstanceTest() {
        }

        @Test
        void testSendAsyncToUnresolvable() throws Exception {
            String subject = NettyMessagingServiceTest.this.nextSubject();
            try (NettyMessagingService service = NettyMessagingServiceTest.this.newMessagingService();){
                Address unresolvable = Address.from((String)"unknown.local", (int)service.address().port());
                Assertions.assertThat((CompletableFuture)service.start()).succeedsWithin(Duration.ofSeconds(5L));
                CompletableFuture response = service.sendAsync(unresolvable, subject, "hello world".getBytes());
                Assertions.assertThat((CompletableFuture)response).failsWithin(Duration.ofSeconds(10L));
            }
        }

        @Test
        void shouldNotBindToAdvertisedAddress() throws Exception {
            Address bindingAddress = NettyMessagingServiceTest.this.newAddress();
            MessagingConfig config = NettyMessagingServiceTest.this.defaultConfig();
            config.setInterfaces(List.of(bindingAddress.host()));
            config.setPort(Integer.valueOf(bindingAddress.port()));
            Address nonBindableAddress = new Address("invalid.host", 1);
            try (NettyMessagingService service = new NettyMessagingService("test", nonBindableAddress, config);){
                Assertions.assertThat((CompletableFuture)service.start()).succeedsWithin(Duration.ofSeconds(5L));
                Assertions.assertThat((Collection)service.bindingAddresses()).contains((Object[])new Address[]{bindingAddress});
                Assertions.assertThat((Object)service.address()).isEqualTo((Object)nonBindableAddress);
            }
        }
    }

    @Nested
    final class VersionTest {
        @AutoCloseResources.AutoCloseResource
        private final NettyMessagingService nettyv11;
        @AutoCloseResources.AutoCloseResource
        private final NettyMessagingService nettyv12;
        @AutoCloseResources.AutoCloseResource
        private final NettyMessagingService nettyv21;
        @AutoCloseResources.AutoCloseResource
        private final NettyMessagingService nettyv22;

        VersionTest() {
            this.nettyv11 = NettyMessagingServiceTest.this.newMessagingService();
            this.nettyv12 = NettyMessagingServiceTest.this.newMessagingService();
            this.nettyv21 = NettyMessagingServiceTest.this.newMessagingService();
            this.nettyv22 = NettyMessagingServiceTest.this.newMessagingService();
        }

        @BeforeEach
        void beforeEach() {
            NettyMessagingServiceTest.this.startMessagingServices(this.nettyv11, this.nettyv12, this.nettyv21, this.nettyv22);
        }

        @Test
        void testV1() throws Exception {
            byte[] payload = "Hello world!".getBytes();
            String subject = NettyMessagingServiceTest.this.nextSubject();
            this.nettyv11.registerHandler(subject, (address, bytes) -> CompletableFuture.completedFuture(bytes));
            byte[] response = (byte[])this.nettyv12.sendAndReceive(this.nettyv11.address(), subject, payload).get(10L, TimeUnit.SECONDS);
            Assertions.assertThat((byte[])response).isEqualTo((Object)payload);
        }

        @Test
        void testV2() throws Exception {
            byte[] payload = "Hello world!".getBytes();
            String subject = NettyMessagingServiceTest.this.nextSubject();
            this.nettyv21.registerHandler(subject, (address, bytes) -> CompletableFuture.completedFuture(bytes));
            byte[] response = (byte[])this.nettyv22.sendAndReceive(this.nettyv21.address(), subject, payload).get(10L, TimeUnit.SECONDS);
            Assertions.assertThat((byte[])response).isEqualTo((Object)payload);
        }

        @Test
        void testVersionNegotiation() throws Exception {
            byte[] payload = "Hello world!".getBytes();
            String subject = NettyMessagingServiceTest.this.nextSubject();
            this.nettyv11.registerHandler(subject, (address, bytes) -> CompletableFuture.completedFuture(bytes));
            byte[] response = (byte[])this.nettyv21.sendAndReceive(this.nettyv11.address(), subject, payload).get(10L, TimeUnit.SECONDS);
            Assertions.assertThat((byte[])response).isEqualTo((Object)payload);
            subject = NettyMessagingServiceTest.this.nextSubject();
            this.nettyv22.registerHandler(subject, (address, bytes) -> CompletableFuture.completedFuture(bytes));
            response = (byte[])this.nettyv12.sendAndReceive(this.nettyv22.address(), subject, payload).get(10L, TimeUnit.SECONDS);
            Assertions.assertThat((byte[])response).isEqualTo((Object)payload);
        }
    }
}

