package org.apache.ignite.internal.network.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.OutNetworkObject;
import org.apache.ignite.network.TestMessage;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.TestMessagesFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.class */
public class RecoveryHandshakeTest {
    private static final short CONNECTION_ID = 1337;
    private static final MessageSerializationRegistry MESSAGE_REGISTRY = new TestMessageSerializationRegistryImpl();
    private static final NetworkMessagesFactory MESSAGE_FACTORY = new NetworkMessagesFactory();
    private static final TestMessagesFactory TEST_MESSAGES_FACTORY = new TestMessagesFactory();
    private final Consumer<InNetworkObject> noMessageListener = inNetworkObject -> {
        Assertions.fail("Received message while shouldn't have, [" + inNetworkObject.message() + "]");
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/network/netty/RecoveryHandshakeTest$MessageListener.class */
    public static class MessageListener implements Consumer<InNetworkObject> {
        private final String expectedMessage;
        private final AtomicBoolean flag;

        private MessageListener(String str, AtomicBoolean atomicBoolean) {
            this.expectedMessage = str;
            this.flag = atomicBoolean;
        }

        @Override // java.util.function.Consumer
        public void accept(InNetworkObject inNetworkObject) {
            if (!this.expectedMessage.equals(((TestMessage) inNetworkObject.message()).msg())) {
                Assertions.fail();
            } else {
                if (this.flag.compareAndSet(false, true)) {
                    return;
                }
                Assertions.fail();
            }
        }
    }

    @Test
    public void testHandshake() throws Exception {
        RecoveryDescriptorProvider createRecoveryDescriptorProvider = createRecoveryDescriptorProvider();
        RecoveryDescriptorProvider createRecoveryDescriptorProvider2 = createRecoveryDescriptorProvider();
        RecoveryClientHandshakeManager createRecoveryClientHandshakeManager = createRecoveryClientHandshakeManager(createRecoveryDescriptorProvider);
        RecoveryServerHandshakeManager createRecoveryServerHandshakeManager = createRecoveryServerHandshakeManager(createRecoveryDescriptorProvider2);
        EmbeddedChannel embeddedChannel = setupChannel(createRecoveryClientHandshakeManager, this.noMessageListener);
        EmbeddedChannel embeddedChannel2 = setupChannel(createRecoveryServerHandshakeManager, this.noMessageListener);
        Assertions.assertTrue(embeddedChannel2.isActive());
        exchangeServerToClient(embeddedChannel2, embeddedChannel);
        exchangeClientToServer(embeddedChannel2, embeddedChannel);
        exchangeServerToClient(embeddedChannel2, embeddedChannel);
        Assertions.assertNull(embeddedChannel.readOutbound());
        Assertions.assertNull(embeddedChannel2.readOutbound());
        checkHandshakeCompleted(createRecoveryServerHandshakeManager);
        checkHandshakeCompleted(createRecoveryClientHandshakeManager);
        checkPipelineAfterHandshake(embeddedChannel2);
        checkPipelineAfterHandshake(embeddedChannel);
        Assertions.assertFalse(embeddedChannel2.finish());
        Assertions.assertFalse(embeddedChannel.finish());
    }

    @Test
    public void testHandshakeWithUnacknowledgedServerMessage() throws Exception {
        RecoveryDescriptorProvider createRecoveryDescriptorProvider = createRecoveryDescriptorProvider();
        RecoveryDescriptorProvider createRecoveryDescriptorProvider2 = createRecoveryDescriptorProvider();
        UUID randomUUID = UUID.randomUUID();
        addUnacknowledgedMessages(createRecoveryDescriptorProvider2.getRecoveryDescriptor("client", randomUUID, (short) 1337, true));
        RecoveryClientHandshakeManager createRecoveryClientHandshakeManager = createRecoveryClientHandshakeManager("client", randomUUID, createRecoveryDescriptorProvider);
        RecoveryServerHandshakeManager createRecoveryServerHandshakeManager = createRecoveryServerHandshakeManager(createRecoveryDescriptorProvider2);
        AtomicReference atomicReference = new AtomicReference();
        EmbeddedChannel embeddedChannel = setupChannel(createRecoveryClientHandshakeManager, inNetworkObject -> {
            NetworkMessage message = inNetworkObject.message();
            Assertions.assertInstanceOf(TestMessage.class, message);
            atomicReference.set((TestMessage) message);
        });
        EmbeddedChannel embeddedChannel2 = setupChannel(createRecoveryServerHandshakeManager, this.noMessageListener);
        Assertions.assertTrue(embeddedChannel2.isActive());
        exchangeServerToClient(embeddedChannel2, embeddedChannel);
        exchangeClientToServer(embeddedChannel2, embeddedChannel);
        exchangeServerToClient(embeddedChannel2, embeddedChannel);
        Assertions.assertNull(embeddedChannel.readOutbound());
        exchangeServerToClient(embeddedChannel2, embeddedChannel);
        Assertions.assertNull(embeddedChannel2.readOutbound());
        Assertions.assertNotNull((TestMessage) atomicReference.get());
        checkHandshakeNotCompleted(createRecoveryServerHandshakeManager);
        checkHandshakeCompleted(createRecoveryClientHandshakeManager);
        exchangeClientToServer(embeddedChannel2, embeddedChannel);
        checkHandshakeCompleted(createRecoveryServerHandshakeManager);
        checkHandshakeCompleted(createRecoveryClientHandshakeManager);
        checkPipelineAfterHandshake(embeddedChannel2);
        checkPipelineAfterHandshake(embeddedChannel);
        Assertions.assertFalse(embeddedChannel2.finish());
        Assertions.assertFalse(embeddedChannel.finish());
    }

    @Test
    public void testHandshakeWithUnacknowledgedClientMessage() throws Exception {
        RecoveryDescriptorProvider createRecoveryDescriptorProvider = createRecoveryDescriptorProvider();
        RecoveryDescriptorProvider createRecoveryDescriptorProvider2 = createRecoveryDescriptorProvider();
        UUID randomUUID = UUID.randomUUID();
        addUnacknowledgedMessages(createRecoveryDescriptorProvider.getRecoveryDescriptor("server", randomUUID, (short) 1337, false));
        RecoveryClientHandshakeManager createRecoveryClientHandshakeManager = createRecoveryClientHandshakeManager(createRecoveryDescriptorProvider);
        RecoveryServerHandshakeManager createRecoveryServerHandshakeManager = createRecoveryServerHandshakeManager("server", randomUUID, createRecoveryDescriptorProvider2);
        AtomicReference atomicReference = new AtomicReference();
        EmbeddedChannel embeddedChannel = setupChannel(createRecoveryClientHandshakeManager, this.noMessageListener);
        EmbeddedChannel embeddedChannel2 = setupChannel(createRecoveryServerHandshakeManager, inNetworkObject -> {
            NetworkMessage message = inNetworkObject.message();
            Assertions.assertInstanceOf(TestMessage.class, message);
            atomicReference.set((TestMessage) message);
        });
        Assertions.assertTrue(embeddedChannel2.isActive());
        exchangeServerToClient(embeddedChannel2, embeddedChannel);
        exchangeClientToServer(embeddedChannel2, embeddedChannel);
        exchangeServerToClient(embeddedChannel2, embeddedChannel);
        Assertions.assertNull(embeddedChannel2.readOutbound());
        exchangeClientToServer(embeddedChannel2, embeddedChannel);
        Assertions.assertNull(embeddedChannel.readOutbound());
        Assertions.assertNotNull((TestMessage) atomicReference.get());
        checkHandshakeCompleted(createRecoveryServerHandshakeManager);
        checkHandshakeNotCompleted(createRecoveryClientHandshakeManager);
        exchangeServerToClient(embeddedChannel2, embeddedChannel);
        checkHandshakeCompleted(createRecoveryServerHandshakeManager);
        checkHandshakeCompleted(createRecoveryClientHandshakeManager);
        checkPipelineAfterHandshake(embeddedChannel2);
        checkPipelineAfterHandshake(embeddedChannel);
        Assertions.assertFalse(embeddedChannel2.finish());
        Assertions.assertFalse(embeddedChannel.finish());
    }

    @Test
    public void testPairedRecoveryDescriptors() throws Exception {
        RecoveryDescriptorProvider createRecoveryDescriptorProvider = createRecoveryDescriptorProvider();
        RecoveryDescriptorProvider createRecoveryDescriptorProvider2 = createRecoveryDescriptorProvider();
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        RecoveryClientHandshakeManager createRecoveryClientHandshakeManager = createRecoveryClientHandshakeManager("client", randomUUID, createRecoveryDescriptorProvider);
        RecoveryServerHandshakeManager createRecoveryServerHandshakeManager = createRecoveryServerHandshakeManager("client", randomUUID, createRecoveryDescriptorProvider);
        RecoveryClientHandshakeManager createRecoveryClientHandshakeManager2 = createRecoveryClientHandshakeManager("server", randomUUID2, createRecoveryDescriptorProvider2);
        RecoveryServerHandshakeManager createRecoveryServerHandshakeManager2 = createRecoveryServerHandshakeManager("server", randomUUID2, createRecoveryDescriptorProvider2);
        EmbeddedChannel embeddedChannel = setupChannel(createRecoveryClientHandshakeManager, this.noMessageListener);
        EmbeddedChannel embeddedChannel2 = setupChannel(createRecoveryServerHandshakeManager, this.noMessageListener);
        EmbeddedChannel embeddedChannel3 = setupChannel(createRecoveryClientHandshakeManager2, this.noMessageListener);
        EmbeddedChannel embeddedChannel4 = setupChannel(createRecoveryServerHandshakeManager2, this.noMessageListener);
        exchangeServerToClient(embeddedChannel2, embeddedChannel3);
        exchangeServerToClient(embeddedChannel4, embeddedChannel);
        exchangeClientToServer(embeddedChannel2, embeddedChannel3);
        exchangeClientToServer(embeddedChannel4, embeddedChannel);
        exchangeServerToClient(embeddedChannel2, embeddedChannel3);
        exchangeServerToClient(embeddedChannel4, embeddedChannel);
        Assertions.assertNotSame(createRecoveryClientHandshakeManager.recoveryDescriptor(), createRecoveryServerHandshakeManager.recoveryDescriptor());
        Assertions.assertNotSame(createRecoveryClientHandshakeManager2.recoveryDescriptor(), createRecoveryServerHandshakeManager2.recoveryDescriptor());
        Assertions.assertFalse(embeddedChannel.finish());
        Assertions.assertFalse(embeddedChannel2.finish());
        Assertions.assertFalse(embeddedChannel3.finish());
        Assertions.assertFalse(embeddedChannel4.finish());
    }

    @Test
    public void testExactlyOnceServer() throws Exception {
        testExactlyOnce(true);
    }

    @Test
    public void testExactlyOnceClient() throws Exception {
        testExactlyOnce(false);
    }

    private void testExactlyOnce(boolean z) throws Exception {
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        RecoveryDescriptorProvider createRecoveryDescriptorProvider = createRecoveryDescriptorProvider();
        RecoveryDescriptorProvider createRecoveryDescriptorProvider2 = createRecoveryDescriptorProvider();
        RecoveryClientHandshakeManager createRecoveryClientHandshakeManager = createRecoveryClientHandshakeManager("client", randomUUID2, createRecoveryDescriptorProvider);
        RecoveryServerHandshakeManager createRecoveryServerHandshakeManager = createRecoveryServerHandshakeManager("server", randomUUID, createRecoveryDescriptorProvider2);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        MessageListener messageListener = new MessageListener("1", atomicBoolean);
        EmbeddedChannel embeddedChannel = setupChannel(createRecoveryClientHandshakeManager, z ? messageListener : this.noMessageListener);
        EmbeddedChannel embeddedChannel2 = setupChannel(createRecoveryServerHandshakeManager, z ? this.noMessageListener : messageListener);
        exchangeServerToClient(embeddedChannel2, embeddedChannel);
        exchangeClientToServer(embeddedChannel2, embeddedChannel);
        exchangeServerToClient(embeddedChannel2, embeddedChannel);
        EmbeddedChannel embeddedChannel3 = z ? embeddedChannel2 : embeddedChannel;
        embeddedChannel3.writeOutbound(new Object[]{new OutNetworkObject(TEST_MESSAGES_FACTORY.testMessage().msg("1").build(), Collections.emptyList())});
        embeddedChannel3.writeOutbound(new Object[]{new OutNetworkObject(TEST_MESSAGES_FACTORY.testMessage().msg("2").build(), Collections.emptyList())});
        if (z) {
            exchangeServerToClient(embeddedChannel2, embeddedChannel);
        } else {
            exchangeClientToServer(embeddedChannel2, embeddedChannel);
        }
        Assertions.assertTrue(atomicBoolean.get());
        if (z) {
            exchangeClientToServer(embeddedChannel2, embeddedChannel);
        } else {
            exchangeServerToClient(embeddedChannel2, embeddedChannel);
        }
        RecoveryClientHandshakeManager createRecoveryClientHandshakeManager2 = createRecoveryClientHandshakeManager("client", randomUUID2, createRecoveryDescriptorProvider);
        RecoveryServerHandshakeManager createRecoveryServerHandshakeManager2 = createRecoveryServerHandshakeManager("server", randomUUID, createRecoveryDescriptorProvider2);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        MessageListener messageListener2 = new MessageListener("2", atomicBoolean2);
        embeddedChannel.finishAndReleaseAll();
        embeddedChannel2.finishAndReleaseAll();
        EmbeddedChannel embeddedChannel4 = setupChannel(createRecoveryClientHandshakeManager2, z ? messageListener2 : this.noMessageListener);
        EmbeddedChannel embeddedChannel5 = setupChannel(createRecoveryServerHandshakeManager2, z ? this.noMessageListener : messageListener2);
        exchangeServerToClient(embeddedChannel5, embeddedChannel4);
        exchangeClientToServer(embeddedChannel5, embeddedChannel4);
        exchangeServerToClient(embeddedChannel5, embeddedChannel4);
        if (z) {
            exchangeServerToClient(embeddedChannel5, embeddedChannel4);
        } else {
            exchangeClientToServer(embeddedChannel5, embeddedChannel4);
        }
        if (z) {
            exchangeClientToServer(embeddedChannel5, embeddedChannel4);
        } else {
            exchangeServerToClient(embeddedChannel5, embeddedChannel4);
        }
        Assertions.assertNull(embeddedChannel5.readOutbound());
        Assertions.assertNull(embeddedChannel4.readOutbound());
        Assertions.assertTrue(atomicBoolean2.get());
        Assertions.assertFalse(embeddedChannel5.finish());
        Assertions.assertFalse(embeddedChannel4.finish());
    }

    private void checkPipelineAfterHandshake(EmbeddedChannel embeddedChannel) {
        Assertions.assertNull(embeddedChannel.pipeline().get("handshake-handler"));
    }

    private void checkHandshakeNotCompleted(HandshakeManager handshakeManager) {
        CompletableFuture handshakeFuture = handshakeManager.handshakeFuture();
        Assertions.assertFalse(handshakeFuture.isDone());
        Assertions.assertFalse(handshakeFuture.isCompletedExceptionally());
        Assertions.assertFalse(handshakeFuture.isCancelled());
    }

    private void checkHandshakeCompleted(HandshakeManager handshakeManager) {
        CompletableFuture handshakeFuture = handshakeManager.handshakeFuture();
        Assertions.assertTrue(handshakeFuture.isDone());
        Assertions.assertFalse(handshakeFuture.isCompletedExceptionally());
        Assertions.assertFalse(handshakeFuture.isCancelled());
    }

    private void addUnacknowledgedMessages(RecoveryDescriptor recoveryDescriptor) {
        recoveryDescriptor.add(new OutNetworkObject(TEST_MESSAGES_FACTORY.testMessage().msg("test").build(), Collections.emptyList()));
    }

    private void exchangeServerToClient(EmbeddedChannel embeddedChannel, EmbeddedChannel embeddedChannel2) {
        embeddedChannel2.writeInbound(new Object[]{(ByteBuf) embeddedChannel.readOutbound()});
    }

    private void exchangeClientToServer(EmbeddedChannel embeddedChannel, EmbeddedChannel embeddedChannel2) {
        embeddedChannel.writeInbound(new Object[]{(ByteBuf) embeddedChannel2.readOutbound()});
    }

    private EmbeddedChannel setupChannel(HandshakeManager handshakeManager, Consumer<InNetworkObject> consumer) throws Exception {
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(false, false, new ChannelHandler[0]);
        PipelineUtils.setup(embeddedChannel.pipeline(), new PerSessionSerializationService(new SerializationService(MESSAGE_REGISTRY, createUserObjectSerializationContext())), handshakeManager, consumer);
        embeddedChannel.register();
        return embeddedChannel;
    }

    private UserObjectSerializationContext createUserObjectSerializationContext() {
        ClassDescriptorRegistry classDescriptorRegistry = new ClassDescriptorRegistry();
        ClassDescriptorFactory classDescriptorFactory = new ClassDescriptorFactory(classDescriptorRegistry);
        return new UserObjectSerializationContext(classDescriptorRegistry, classDescriptorFactory, new DefaultUserObjectMarshaller(classDescriptorRegistry, classDescriptorFactory));
    }

    private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(RecoveryDescriptorProvider recoveryDescriptorProvider) {
        return createRecoveryClientHandshakeManager("client", UUID.randomUUID(), recoveryDescriptorProvider);
    }

    private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(String str, UUID uuid, RecoveryDescriptorProvider recoveryDescriptorProvider) {
        return new RecoveryClientHandshakeManager(uuid, str, (short) 1337, MESSAGE_FACTORY, recoveryDescriptorProvider);
    }

    private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(RecoveryDescriptorProvider recoveryDescriptorProvider) {
        return createRecoveryServerHandshakeManager("server", UUID.randomUUID(), recoveryDescriptorProvider);
    }

    private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(String str, UUID uuid, RecoveryDescriptorProvider recoveryDescriptorProvider) {
        return new RecoveryServerHandshakeManager(uuid, str, MESSAGE_FACTORY, recoveryDescriptorProvider);
    }

    private RecoveryDescriptorProvider createRecoveryDescriptorProvider() {
        return new DefaultRecoveryDescriptorProvider();
    }
}
