package io.joynr.integration;

import io.joynr.dispatching.subscription.MulticastIdUtil;
import io.joynr.exceptions.JoynrDelayMessageException;
import io.joynr.exceptions.JoynrMessageNotSentException;
import io.joynr.exceptions.JoynrRuntimeException;
import io.joynr.integration.ProxyErrorsTest;
import io.joynr.messaging.FailureAction;
import io.joynr.messaging.IMessagingStub;
import io.joynr.messaging.JoynrMessageProcessor;
import io.joynr.messaging.MessagingQos;
import io.joynr.messaging.MulticastReceiverRegistrar;
import io.joynr.messaging.SuccessAction;
import io.joynr.messaging.inprocess.InProcessAddress;
import io.joynr.messaging.inprocess.InProcessMessagingStub;
import io.joynr.messaging.inprocess.InProcessMessagingStubFactory;
import io.joynr.messaging.websocket.IWebSocketMessagingSkeleton;
import io.joynr.provider.Promise;
import io.joynr.proxy.MessageIdCallback;
import io.joynr.proxy.ReplyContext;
import io.joynr.proxy.StatelessAsyncIdCalculator;
import io.joynr.smrf.EncodingException;
import io.joynr.smrf.UnsuppportedVersionException;
import io.joynr.util.JoynrUtil;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import joynr.ImmutableMessage;
import joynr.Message;
import joynr.MulticastSubscriptionRequest;
import joynr.MutableMessage;
import joynr.OnChangeSubscriptionQos;
import joynr.OneWayRequest;
import joynr.Reply;
import joynr.SubscriptionRequest;
import joynr.SubscriptionStop;
import joynr.system.RoutingTypes.Address;
import joynr.system.RoutingTypes.RoutingTypesUtil;
import joynr.system.RoutingTypes.WebSocketClientAddress;
import joynr.tests.DefaulttestProvider;
import joynr.tests.testProxy;
import joynr.tests.testStatelessAsyncCallback;
import joynr.tests.testTypes.TestEnum;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:io/joynr/integration/CcRoutingTableCleanupTest.class */
public class CcRoutingTableCleanupTest extends AbstractRoutingTableCleanupTest {
    private static final Logger logger = LoggerFactory.getLogger(CcRoutingTableCleanupTest.class);
    private static final long DEFAULT_WAIT_TIME = 10000;
    private String proxyParticipantId;
    private DefaulttestProvider testProvider;
    private static final String STATELESS_ASYNC_USE_CASE = "testStatelessUseCase";
    private final WebSocketClientAddress wsClientAddress = new WebSocketClientAddress("wsProxyRuntime");
    ThrowingConsumer<MutableMessage> insertWsMessage = mutableMessage -> {
        fakeIncomingWsMessage(mutableMessage);
    };
    ThrowingConsumer<MutableMessage> insertMqttMessage = mutableMessage -> {
        fakeIncomingMqttMessage(this.gbids[1], mutableMessage);
    };

    /* loaded from: input_file:io/joynr/integration/CcRoutingTableCleanupTest$StatelessAsyncCallback.class */
    public class StatelessAsyncCallback implements testStatelessAsyncCallback {
        private CountDownLatch resultCdl;

        public StatelessAsyncCallback(CountDownLatch countDownLatch) {
            this.resultCdl = countDownLatch;
        }

        public String getUseCase() {
            return CcRoutingTableCleanupTest.STATELESS_ASYNC_USE_CASE;
        }

        public void sayHelloSuccess(String str, ReplyContext replyContext) {
            CcRoutingTableCleanupTest.logger.info("SUCCESS for {}: {}", replyContext.getMessageId(), str);
            this.resultCdl.countDown();
        }

        public void sayHelloFailed(JoynrRuntimeException joynrRuntimeException, ReplyContext replyContext) {
            CcRoutingTableCleanupTest.logger.error("ERROR for {}: {}", replyContext.getMessageId(), joynrRuntimeException);
            this.resultCdl.countDown();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:io/joynr/integration/CcRoutingTableCleanupTest$ThrowingBiConsumer.class */
    public interface ThrowingBiConsumer<T, U> {
        void accept(T t, U u) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:io/joynr/integration/CcRoutingTableCleanupTest$ThrowingConsumer.class */
    public interface ThrowingConsumer<T> {
        void accept(T t) throws Exception;
    }

    @Override // io.joynr.integration.AbstractRoutingTableCleanupTest
    @Before
    public void setUp() throws InterruptedException, IOException {
        super.setUp();
        this.proxyParticipantId = "proxy-" + JoynrUtil.createUuidString();
        this.testProvider = setupProvider();
    }

    @Override // io.joynr.integration.AbstractRoutingTableCleanupTest
    @After
    public void tearDown() {
        Mockito.reset(new InProcessMessagingStubFactory[]{this.inProcessMessagingStubFactorySpy});
        Mockito.verifyNoMoreInteractions(new Object[]{this.mqttMessagingStubMock});
        checkRefCnt("provider-1", 1L);
        unregisterGlobal("testCustomDomain1", this.testProvider);
        checkRefCnt("provider-1", 0L);
        checkRefCnt(this.proxyParticipantId, 1L);
        this.routingTable.remove(this.proxyParticipantId);
        Mockito.verifyNoMoreInteractions(new Object[]{this.mqttMessagingStubMock});
        Mockito.verifyNoMoreInteractions(new Object[]{this.webSocketClientMessagingStubMock});
        super.tearDown();
    }

    private DefaulttestProvider setupProvider() {
        checkRefCnt("provider-1", 0L);
        DefaulttestProvider defaulttestProvider = (DefaulttestProvider) Mockito.spy(new DefaulttestProvider());
        registerGlobal(defaulttestProvider, "testCustomDomain1", this.providerQosGlobal);
        Mockito.reset(new DefaulttestProvider[]{defaulttestProvider});
        checkRefCnt("provider-1", 1L);
        return defaulttestProvider;
    }

    private void createProxyRoutingEntry(Address address) {
        checkRefCnt(this.proxyParticipantId, 0L);
        this.routingTable.put(this.proxyParticipantId, address, true, Long.MAX_VALUE);
        checkRefCnt(this.proxyParticipantId, 1L);
    }

    private CountDownLatch handleAndCheckOutgoing(IMessagingStub iMessagingStub, Message.MessageType messageType, boolean z) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((IMessagingStub) Mockito.doAnswer(invocationOnMock -> {
            ImmutableMessage immutableMessage = (ImmutableMessage) invocationOnMock.getArguments()[0];
            Assert.assertEquals(this.proxyParticipantId, immutableMessage.getRecipient());
            Assert.assertEquals("provider-1", immutableMessage.getSender());
            Assert.assertEquals(messageType, immutableMessage.getType());
            checkRefCnt(this.proxyParticipantId, z ? 2L : 1L);
            ((SuccessAction) invocationOnMock.getArguments()[1]).execute();
            checkRefCnt("provider-1", 1L);
            checkRefCnt(this.proxyParticipantId, 1L);
            countDownLatch.countDown();
            return null;
        }).when(iMessagingStub)).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
        return countDownLatch;
    }

    private void verifyOutgoing(IMessagingStub iMessagingStub, final Message.MessageType messageType, int i) {
        ((IMessagingStub) Mockito.verify(iMessagingStub, Mockito.times(i))).transmit((ImmutableMessage) ArgumentMatchers.argThat(new ArgumentMatcher<ImmutableMessage>() { // from class: io.joynr.integration.CcRoutingTableCleanupTest.1
            public boolean matches(ImmutableMessage immutableMessage) {
                return messageType.equals(immutableMessage.getType());
            }
        }), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
    }

    private void checkIncominMsgAndRefCounts(boolean z, InvocationOnMock invocationOnMock, Message.MessageType messageType) {
        ImmutableMessage immutableMessage = (ImmutableMessage) invocationOnMock.getArguments()[0];
        try {
            Assert.assertEquals(messageType, immutableMessage.getType());
            Assert.assertEquals("provider-1", immutableMessage.getRecipient());
            checkRefCnt("provider-1", 1L);
            checkRefCnt(this.proxyParticipantId, z ? 2L : 1L);
        } catch (AssertionError e) {
            logger.error("TEST FAILED in checkIncominMsgAndRefCounts", e);
            throw e;
        }
    }

    private CountDownLatch delayProviderVoidOperation(boolean z) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((DefaulttestProvider) Mockito.doAnswer(invocationOnMock -> {
            checkRefCnt("provider-1", 1L);
            checkRefCnt(this.proxyParticipantId, z ? 2L : 1L);
            countDownLatch.await();
            return (Promise) invocationOnMock.callRealMethod();
        }).when(this.testProvider)).voidOperation();
        return countDownLatch;
    }

    private void fakeIncomingWsMessage(MutableMessage mutableMessage) throws EncodingException, UnsuppportedVersionException {
        ((IWebSocketMessagingSkeleton) this.messagingSkeletonFactory.getSkeleton(new WebSocketClientAddress()).get()).transmit(mutableMessage.getImmutableMessage().getSerializedMessage(), new FailureAction() { // from class: io.joynr.integration.CcRoutingTableCleanupTest.2
            public void execute(Throwable th) {
                Assert.fail("fake incoming WS message failed in skeleton.transmit: " + th);
            }
        });
    }

    private void fakeIncomingRequest(ThrowingConsumer<MutableMessage> throwingConsumer) {
        try {
            throwingConsumer.accept(createRequestMsg(this.proxyParticipantId, "provider-1"));
        } catch (Exception e) {
            Assert.fail("fake incoming request failed: " + e);
        }
    }

    private void fakeIncomingSrq(ThrowingConsumer<MutableMessage> throwingConsumer, String str, long j) {
        try {
            throwingConsumer.accept(createSrqMsg(this.proxyParticipantId, "provider-1", str, j));
        } catch (Exception e) {
            Assert.fail("fake incoming subscription request failed: " + e);
        }
    }

    private CountDownLatch fakeIncomingSst(ThrowingConsumer<MutableMessage> throwingConsumer, String str, boolean z) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((InProcessMessagingStubFactory) Mockito.doAnswer(invocationOnMock -> {
            InProcessMessagingStub inProcessMessagingStub = (InProcessMessagingStub) Mockito.spy((InProcessMessagingStub) invocationOnMock.callRealMethod());
            ((InProcessMessagingStub) Mockito.doAnswer(invocationOnMock -> {
                ImmutableMessage immutableMessage = (ImmutableMessage) invocationOnMock.getArguments()[0];
                Assert.assertEquals(Message.MessageType.VALUE_MESSAGE_TYPE_SUBSCRIPTION_STOP, immutableMessage.getType());
                Assert.assertEquals("provider-1", immutableMessage.getRecipient());
                checkRefCnt("provider-1", 1L);
                checkRefCnt(this.proxyParticipantId, z ? 2L : 1L);
                SuccessAction successAction = (SuccessAction) invocationOnMock.getArguments()[1];
                invocationOnMock.callRealMethod();
                successAction.execute();
                countDownLatch.countDown();
                return null;
            }).when(inProcessMessagingStub)).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
            return inProcessMessagingStub;
        }).when(this.inProcessMessagingStubFactorySpy)).create((InProcessAddress) ArgumentMatchers.any(InProcessAddress.class));
        try {
            throwingConsumer.accept(this.messageFactory.createSubscriptionStop(this.proxyParticipantId, "provider-1", new SubscriptionStop(str), this.defaultMessagingQos));
        } catch (Exception e) {
            Assert.fail("fake incoming subscription stop failed: " + e);
        }
        return countDownLatch;
    }

    private InProcessMessagingStub mockInProcessStub() {
        InProcessMessagingStub inProcessMessagingStub = (InProcessMessagingStub) Mockito.mock(InProcessMessagingStub.class);
        ((InProcessMessagingStubFactory) Mockito.doReturn(inProcessMessagingStub).when(this.inProcessMessagingStubFactorySpy)).create((InProcessAddress) ArgumentMatchers.any(InProcessAddress.class));
        return inProcessMessagingStub;
    }

    private void rqRp_success(Address address, IMessagingStub iMessagingStub, boolean z, ThrowingConsumer<MutableMessage> throwingConsumer) {
        createProxyRoutingEntry(address);
        CountDownLatch delayProviderVoidOperation = delayProviderVoidOperation(z);
        fakeIncomingRequest(throwingConsumer);
        CountDownLatch handleAndCheckOutgoing = handleAndCheckOutgoing(iMessagingStub, Message.MessageType.VALUE_MESSAGE_TYPE_REPLY, z);
        delayProviderVoidOperation.countDown();
        waitFor(handleAndCheckOutgoing, DEFAULT_WAIT_TIME);
        verifyOutgoing(iMessagingStub, Message.MessageType.VALUE_MESSAGE_TYPE_REPLY, 1);
    }

    @Test
    public void mqtt_rqRp_success() {
        rqRp_success(this.replyToAddress, this.mqttMessagingStubMock, true, this.insertMqttMessage);
    }

    @Test
    public void ws_rqRp_success() {
        rqRp_success(this.wsClientAddress, this.webSocketClientMessagingStubMock, false, this.insertWsMessage);
    }

    private void rqRp_error_rpExpired(Address address, boolean z, ThrowingConsumer<MutableMessage> throwingConsumer) {
        createProxyRoutingEntry(address);
        CountDownLatch delayProviderVoidOperation = delayProviderVoidOperation(z);
        this.defaultMessagingQos.setTtl_ms(512L);
        fakeIncomingRequest(throwingConsumer);
        sleep(this.defaultMessagingQos.getRoundTripTtl_ms());
        delayProviderVoidOperation.countDown();
        sleep(500L);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 1L);
    }

    @Test
    public void mqttRqRp_error_rpExpired() {
        rqRp_error_rpExpired(this.replyToAddress, true, this.insertMqttMessage);
    }

    @Test
    public void ws_rqRp_error_rpExpired() {
        rqRp_error_rpExpired(this.wsClientAddress, false, this.insertWsMessage);
    }

    private void rqRp_error_rqExpired(Address address, IMessagingStub iMessagingStub, boolean z, ThrowingBiConsumer<MutableMessage, FailureAction> throwingBiConsumer) {
        createProxyRoutingEntry(address);
        this.defaultMessagingQos.setTtl_ms(0L);
        MutableMessage createRequestMsg = createRequestMsg(this.proxyParticipantId, "provider-1");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        sleep(1L);
        try {
            throwingBiConsumer.accept(createRequestMsg, th -> {
                Assert.assertTrue(JoynrMessageNotSentException.class.isInstance(th));
                Assert.assertTrue(th.getMessage().contains("expired"));
                countDownLatch.countDown();
            });
        } catch (Exception e) {
            Assert.fail("fake incoming request failed: " + e);
        }
        waitFor(countDownLatch, DEFAULT_WAIT_TIME);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 1L);
        Mockito.verifyNoMoreInteractions(new Object[]{this.testProvider});
    }

    @Test
    public void mqtt_rqRp_error_rqExpired() {
        rqRp_error_rqExpired(this.replyToAddress, this.mqttMessagingStubMock, true, (mutableMessage, failureAction) -> {
            this.mqttSkeletonFactory.getSkeleton(this.replyToAddress).transmit(mutableMessage.getImmutableMessage().getSerializedMessage(), mutableMessage.getImmutableMessage().getPrefixedCustomHeaders(), failureAction);
        });
    }

    @Test
    public void ws_rqRp_error_rqExpired() {
        rqRp_error_rqExpired(this.wsClientAddress, this.webSocketClientMessagingStubMock, false, (mutableMessage, failureAction) -> {
            ((IWebSocketMessagingSkeleton) this.messagingSkeletonFactory.getSkeleton(new WebSocketClientAddress()).get()).transmit(mutableMessage.getImmutableMessage().getSerializedMessage(), failureAction);
        });
    }

    private void rqRp_error_rpExpiredInMessageWorker(Address address, IMessagingStub iMessagingStub, boolean z, ThrowingConsumer<MutableMessage> throwingConsumer) {
        createProxyRoutingEntry(address);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.defaultMessagingQos.setTtl_ms(512L);
        ((IMessagingStub) Mockito.doAnswer(invocationOnMock -> {
            ImmutableMessage immutableMessage = (ImmutableMessage) invocationOnMock.getArguments()[0];
            Assert.assertEquals(Message.MessageType.VALUE_MESSAGE_TYPE_REPLY, immutableMessage.getType());
            Assert.assertEquals(this.proxyParticipantId, immutableMessage.getRecipient());
            checkRefCnt("provider-1", 1L);
            checkRefCnt(this.proxyParticipantId, z ? 2L : 1L);
            ((FailureAction) invocationOnMock.getArguments()[2]).execute(new JoynrDelayMessageException((immutableMessage.getTtlMs() - System.currentTimeMillis()) + 1, "test expired reply in MessageWorker"));
            countDownLatch.countDown();
            return null;
        }).when(iMessagingStub)).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
        fakeIncomingRequest(throwingConsumer);
        waitFor(countDownLatch, DEFAULT_WAIT_TIME);
        sleep(this.defaultMessagingQos.getRoundTripTtl_ms() + 100);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 1L);
        verifyOutgoing(iMessagingStub, Message.MessageType.VALUE_MESSAGE_TYPE_REPLY, 1);
    }

    @Test
    public void mqtt_rqRp_error_rpExpiredInMessageWorker() {
        rqRp_error_rpExpiredInMessageWorker(this.replyToAddress, this.mqttMessagingStubMock, true, this.insertMqttMessage);
    }

    @Test
    public void ws_rqRp_error_rpExpiredInMessageWorker() {
        rqRp_error_rpExpiredInMessageWorker(this.wsClientAddress, this.webSocketClientMessagingStubMock, false, this.insertWsMessage);
    }

    private void rqRp_error_rqExpiredInMessageWorker_noFakeReply(Address address, boolean z, ThrowingConsumer<MutableMessage> throwingConsumer) {
        createProxyRoutingEntry(address);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.defaultMessagingQos.setTtl_ms(512L);
        ((InProcessMessagingStub) Mockito.doAnswer(invocationOnMock -> {
            ImmutableMessage immutableMessage = (ImmutableMessage) invocationOnMock.getArguments()[0];
            checkIncominMsgAndRefCounts(z, invocationOnMock, Message.MessageType.VALUE_MESSAGE_TYPE_REQUEST);
            ((FailureAction) invocationOnMock.getArguments()[2]).execute(new JoynrDelayMessageException((immutableMessage.getTtlMs() - System.currentTimeMillis()) + 1, "test expired request in MessageWorker"));
            countDownLatch.countDown();
            return null;
        }).when(mockInProcessStub())).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
        fakeIncomingRequest(throwingConsumer);
        waitFor(countDownLatch, DEFAULT_WAIT_TIME);
        sleep(this.defaultMessagingQos.getRoundTripTtl_ms() + 100);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 1L);
        Mockito.verifyNoMoreInteractions(new Object[]{this.testProvider});
    }

    @Test
    public void mqtt_rqRp_error_rqExpiredInMessageWorker_noFakeReply() {
        rqRp_error_rqExpiredInMessageWorker_noFakeReply(this.replyToAddress, true, this.insertMqttMessage);
    }

    @Test
    public void ws_qRp_error_rqExpiredInMessageWorker_noFakeReply() {
        rqRp_error_rqExpiredInMessageWorker_noFakeReply(this.wsClientAddress, false, this.insertWsMessage);
    }

    private void rqRp_error_rqErrorFromStub_fakeReplyCreationFails(Address address, boolean z, ThrowingConsumer<MutableMessage> throwingConsumer) {
        createProxyRoutingEntry(address);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((InProcessMessagingStub) Mockito.doAnswer(invocationOnMock -> {
            checkIncominMsgAndRefCounts(z, invocationOnMock, Message.MessageType.VALUE_MESSAGE_TYPE_REQUEST);
            ((FailureAction) invocationOnMock.getArguments()[2]).execute(new JoynrMessageNotSentException("test fake reply creation fails"));
            countDownLatch.countDown();
            return null;
        }).when(mockInProcessStub())).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
        MutableMessage createRequestMsg = createRequestMsg(this.proxyParticipantId, "provider-1");
        ((JoynrMessageProcessor) Mockito.doAnswer(invocationOnMock2 -> {
            ImmutableMessage immutableMessage = (ImmutableMessage) Mockito.spy((ImmutableMessage) invocationOnMock2.getArguments()[0]);
            ((ImmutableMessage) Mockito.doThrow(new Throwable[]{new RuntimeException("force fake reply creation error")}).when(immutableMessage)).getEffort();
            return immutableMessage;
        }).when(this.messageProcessorMock)).processIncoming((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class));
        try {
            throwingConsumer.accept(createRequestMsg);
        } catch (Exception e) {
            Assert.fail("fake incoming request failed: " + e);
        }
        waitFor(countDownLatch, DEFAULT_WAIT_TIME);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 1L);
        Mockito.verifyNoMoreInteractions(new Object[]{this.testProvider});
    }

    @Test
    public void mqtt_rqRp_error_rqErrorFromStub_fakeReplyCreationFails() {
        rqRp_error_rqErrorFromStub_fakeReplyCreationFails(this.replyToAddress, true, this.insertMqttMessage);
    }

    @Test
    public void ws_rqRp_error_rqErrorFromStub_fakeReplyCreationFails() {
        rqRp_error_rqErrorFromStub_fakeReplyCreationFails(this.wsClientAddress, false, this.insertWsMessage);
    }

    private void rqRp_error_rqErrorFromStub_fakeReply(Address address, IMessagingStub iMessagingStub, boolean z, ThrowingConsumer<MutableMessage> throwingConsumer) {
        createProxyRoutingEntry(address);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((InProcessMessagingStub) Mockito.doAnswer(invocationOnMock -> {
            checkIncominMsgAndRefCounts(z, invocationOnMock, Message.MessageType.VALUE_MESSAGE_TYPE_REQUEST);
            ((FailureAction) invocationOnMock.getArguments()[2]).execute(new JoynrMessageNotSentException("test fake reply creation"));
            countDownLatch.countDown();
            return null;
        }).when(mockInProcessStub())).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
        CountDownLatch handleAndCheckOutgoing = handleAndCheckOutgoing(iMessagingStub, Message.MessageType.VALUE_MESSAGE_TYPE_REPLY, z);
        fakeIncomingRequest(throwingConsumer);
        waitFor(countDownLatch, DEFAULT_WAIT_TIME);
        waitFor(handleAndCheckOutgoing, DEFAULT_WAIT_TIME);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 1L);
        Mockito.verifyNoMoreInteractions(new Object[]{this.testProvider});
        verifyOutgoing(iMessagingStub, Message.MessageType.VALUE_MESSAGE_TYPE_REPLY, 1);
    }

    @Test
    public void mqtt_rqRp_error_rqErrorFromStub_fakeReply() {
        rqRp_error_rqErrorFromStub_fakeReply(this.replyToAddress, this.mqttMessagingStubMock, true, this.insertMqttMessage);
    }

    @Test
    public void ws_rqRp_error_rqErrorFromStub_fakeReply() {
        rqRp_error_rqErrorFromStub_fakeReply(this.wsClientAddress, this.webSocketClientMessagingStubMock, false, this.insertWsMessage);
    }

    private void rqRp_error_rqWithRelativeTtl(Address address, ThrowingBiConsumer<MutableMessage, FailureAction> throwingBiConsumer) {
        createProxyRoutingEntry(address);
        MutableMessage createRequestMsg = createRequestMsg(this.proxyParticipantId, "provider-1");
        createRequestMsg.setTtlAbsolute(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            throwingBiConsumer.accept(createRequestMsg, th -> {
                Assert.assertTrue(JoynrRuntimeException.class.isInstance(th));
                Assert.assertTrue(th.getMessage().contains("Relative ttl not supported"));
                countDownLatch.countDown();
            });
        } catch (Exception e) {
            Assert.fail("fake incoming request failed: " + e);
        }
        waitFor(countDownLatch, DEFAULT_WAIT_TIME);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 1L);
        Mockito.verifyNoMoreInteractions(new Object[]{this.testProvider});
    }

    @Test
    public void mqtt_rqRp_error_rqWithRelativeTtl() {
        rqRp_error_rqWithRelativeTtl(this.replyToAddress, (mutableMessage, failureAction) -> {
            this.mqttSkeletonFactory.getSkeleton(this.replyToAddress).transmit(mutableMessage.getImmutableMessage().getSerializedMessage(), mutableMessage.getImmutableMessage().getPrefixedCustomHeaders(), failureAction);
        });
    }

    @Test
    public void ws_rqRp_error_rqWithRelativeTtl() {
        rqRp_error_rqWithRelativeTtl(this.wsClientAddress, (mutableMessage, failureAction) -> {
            ((IWebSocketMessagingSkeleton) this.messagingSkeletonFactory.getSkeleton(new WebSocketClientAddress()).get()).transmit(mutableMessage.getImmutableMessage().getSerializedMessage(), failureAction);
        });
    }

    private void rqRp_error_rqMaxRetryReached(Address address, IMessagingStub iMessagingStub, boolean z, ThrowingConsumer<MutableMessage> throwingConsumer) {
        createProxyRoutingEntry(address);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        InProcessMessagingStub mockInProcessStub = mockInProcessStub();
        ((InProcessMessagingStub) Mockito.doAnswer(invocationOnMock -> {
            checkIncominMsgAndRefCounts(z, invocationOnMock, Message.MessageType.VALUE_MESSAGE_TYPE_REQUEST);
            ((FailureAction) invocationOnMock.getArguments()[2]).execute(new JoynrDelayMessageException(0L, "test max retry count"));
            countDownLatch.countDown();
            return null;
        }).when(mockInProcessStub)).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
        fakeIncomingRequest(throwingConsumer);
        waitFor(countDownLatch, DEFAULT_WAIT_TIME);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 1L);
        Mockito.verifyNoMoreInteractions(new Object[]{this.testProvider});
        ((InProcessMessagingStub) Mockito.verify(mockInProcessStub, Mockito.times(3))).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
        Mockito.verifyNoMoreInteractions(new Object[]{iMessagingStub, mockInProcessStub});
    }

    @Test
    public void mqtt_rqRp_error_rqMaxRetryReached() {
        rqRp_error_rqMaxRetryReached(this.replyToAddress, this.mqttMessagingStubMock, true, this.insertMqttMessage);
    }

    @Test
    public void ws_rqRp_error_rqMaxRetryReached() {
        rqRp_error_rqMaxRetryReached(this.wsClientAddress, this.webSocketClientMessagingStubMock, false, this.insertWsMessage);
    }

    private void srqSrp_success_stoppedBySst(Address address, IMessagingStub iMessagingStub, boolean z, ThrowingConsumer<MutableMessage> throwingConsumer) {
        createProxyRoutingEntry(address);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((InProcessMessagingStubFactory) Mockito.doAnswer(invocationOnMock -> {
            InProcessMessagingStub inProcessMessagingStub = (InProcessMessagingStub) Mockito.spy((InProcessMessagingStub) invocationOnMock.callRealMethod());
            ((InProcessMessagingStub) Mockito.doAnswer(invocationOnMock -> {
                checkIncominMsgAndRefCounts(z, invocationOnMock, Message.MessageType.VALUE_MESSAGE_TYPE_BROADCAST_SUBSCRIPTION_REQUEST);
                SuccessAction successAction = (SuccessAction) invocationOnMock.getArguments()[1];
                invocationOnMock.callRealMethod();
                successAction.execute();
                checkRefCnt("provider-1", 1L);
                checkRefCnt(this.proxyParticipantId, z ? 3L : 2L);
                countDownLatch.countDown();
                return null;
            }).when(inProcessMessagingStub)).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
            return inProcessMessagingStub;
        }).when(this.inProcessMessagingStubFactorySpy)).create((InProcessAddress) ArgumentMatchers.any(InProcessAddress.class));
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(2);
        ((IMessagingStub) Mockito.doAnswer(invocationOnMock2 -> {
            ImmutableMessage immutableMessage = (ImmutableMessage) invocationOnMock2.getArguments()[0];
            Assert.assertEquals(this.proxyParticipantId, immutableMessage.getRecipient());
            if (Message.MessageType.VALUE_MESSAGE_TYPE_PUBLICATION.equals(immutableMessage.getType())) {
                checkRefCnt("provider-1", 1L);
                checkRefCnt(this.proxyParticipantId, 2L);
                countDownLatch3.countDown();
                return null;
            }
            Assert.assertEquals(Message.MessageType.VALUE_MESSAGE_TYPE_SUBSCRIPTION_REPLY, immutableMessage.getType());
            checkRefCnt("provider-1", 1L);
            checkRefCnt(this.proxyParticipantId, z ? 3L : 2L);
            ((SuccessAction) invocationOnMock2.getArguments()[1]).execute();
            checkRefCnt("provider-1", 1L);
            checkRefCnt(this.proxyParticipantId, 2L);
            countDownLatch2.countDown();
            return null;
        }).when(iMessagingStub)).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
        String createUuidString = JoynrUtil.createUuidString();
        fakeIncomingSrq(throwingConsumer, createUuidString, 5000L);
        waitFor(countDownLatch, DEFAULT_WAIT_TIME);
        waitFor(countDownLatch2, DEFAULT_WAIT_TIME);
        verifyOutgoing(iMessagingStub, Message.MessageType.VALUE_MESSAGE_TYPE_SUBSCRIPTION_REPLY, 1);
        this.testProvider.fireIntBroadcast(42);
        this.testProvider.fireIntBroadcast(43);
        try {
            Assert.assertTrue(countDownLatch3.await(DEFAULT_WAIT_TIME, TimeUnit.MILLISECONDS));
        } catch (InterruptedException e) {
            Assert.fail("Wait for publication failed: " + e);
        }
        verifyOutgoing(iMessagingStub, Message.MessageType.VALUE_MESSAGE_TYPE_PUBLICATION, 2);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 2L);
        waitFor(fakeIncomingSst(throwingConsumer, createUuidString, true), DEFAULT_WAIT_TIME);
        this.testProvider.fireIntBroadcast(44);
        sleep(200L);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 1L);
        Mockito.verifyNoMoreInteractions(new Object[]{iMessagingStub});
    }

    @Test
    public void mqtt_srqSrp_success_stoppedBySst() {
        srqSrp_success_stoppedBySst(this.replyToAddress, this.mqttMessagingStubMock, true, this.insertMqttMessage);
    }

    @Test
    public void ws_srqSrp_success_stoppedBySst() {
        srqSrp_success_stoppedBySst(this.wsClientAddress, this.webSocketClientMessagingStubMock, false, this.insertWsMessage);
    }

    private void srqSrp_success_stoppedByExpiration(Address address, IMessagingStub iMessagingStub, boolean z, ThrowingConsumer<MutableMessage> throwingConsumer) {
        createProxyRoutingEntry(address);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((InProcessMessagingStubFactory) Mockito.doAnswer(invocationOnMock -> {
            InProcessMessagingStub inProcessMessagingStub = (InProcessMessagingStub) Mockito.spy((InProcessMessagingStub) invocationOnMock.callRealMethod());
            ((InProcessMessagingStub) Mockito.doAnswer(invocationOnMock -> {
                checkIncominMsgAndRefCounts(z, invocationOnMock, Message.MessageType.VALUE_MESSAGE_TYPE_BROADCAST_SUBSCRIPTION_REQUEST);
                SuccessAction successAction = (SuccessAction) invocationOnMock.getArguments()[1];
                invocationOnMock.callRealMethod();
                successAction.execute();
                checkRefCnt("provider-1", 1L);
                checkRefCnt(this.proxyParticipantId, z ? 3L : 2L);
                countDownLatch.countDown();
                return null;
            }).when(inProcessMessagingStub)).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
            return inProcessMessagingStub;
        }).when(this.inProcessMessagingStubFactorySpy)).create((InProcessAddress) ArgumentMatchers.any(InProcessAddress.class));
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(2);
        ((IMessagingStub) Mockito.doAnswer(invocationOnMock2 -> {
            ImmutableMessage immutableMessage = (ImmutableMessage) invocationOnMock2.getArguments()[0];
            Assert.assertEquals(this.proxyParticipantId, immutableMessage.getRecipient());
            if (Message.MessageType.VALUE_MESSAGE_TYPE_PUBLICATION.equals(immutableMessage.getType())) {
                checkRefCnt("provider-1", 1L);
                checkRefCnt(this.proxyParticipantId, 2L);
                countDownLatch3.countDown();
                return null;
            }
            Assert.assertEquals(Message.MessageType.VALUE_MESSAGE_TYPE_SUBSCRIPTION_REPLY, immutableMessage.getType());
            checkRefCnt("provider-1", 1L);
            checkRefCnt(this.proxyParticipantId, z ? 3L : 2L);
            if (!countDownLatch.await(DEFAULT_WAIT_TIME, TimeUnit.MILLISECONDS)) {
                logger.error("FAILURE: waiting for srq timed out");
            }
            ((SuccessAction) invocationOnMock2.getArguments()[1]).execute();
            checkRefCnt("provider-1", 1L);
            checkRefCnt(this.proxyParticipantId, 2L);
            countDownLatch2.countDown();
            return null;
        }).when(iMessagingStub)).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
        fakeIncomingSrq(throwingConsumer, JoynrUtil.createUuidString(), 753L);
        waitFor(countDownLatch, DEFAULT_WAIT_TIME);
        waitFor(countDownLatch2, DEFAULT_WAIT_TIME);
        verifyOutgoing(iMessagingStub, Message.MessageType.VALUE_MESSAGE_TYPE_SUBSCRIPTION_REPLY, 1);
        this.testProvider.fireIntBroadcast(42);
        this.testProvider.fireIntBroadcast(43);
        try {
            Assert.assertTrue(countDownLatch3.await(DEFAULT_WAIT_TIME, TimeUnit.MILLISECONDS));
        } catch (InterruptedException e) {
            Assert.fail("Sleep/wait for publications failed: " + e);
        }
        sleep(853L);
        verifyOutgoing(iMessagingStub, Message.MessageType.VALUE_MESSAGE_TYPE_PUBLICATION, 2);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 1L);
        this.testProvider.fireIntBroadcast(44);
        sleep(200L);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 1L);
        Mockito.verifyNoMoreInteractions(new Object[]{iMessagingStub});
    }

    @Test
    public void mqtt_srqSrp_success_stoppedByExpiration() {
        srqSrp_success_stoppedByExpiration(this.replyToAddress, this.mqttMessagingStubMock, true, this.insertMqttMessage);
    }

    @Test
    public void ws_srqSrp_success_stoppedByExpiration() {
        srqSrp_success_stoppedByExpiration(this.wsClientAddress, this.webSocketClientMessagingStubMock, false, this.insertWsMessage);
    }

    private void srqSrp_error_srpExpired(Address address, IMessagingStub iMessagingStub, boolean z, ThrowingConsumer<MutableMessage> throwingConsumer) {
        createProxyRoutingEntry(address);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((InProcessMessagingStubFactory) Mockito.doAnswer(invocationOnMock -> {
            InProcessMessagingStub inProcessMessagingStub = (InProcessMessagingStub) Mockito.spy((InProcessMessagingStub) invocationOnMock.callRealMethod());
            ((InProcessMessagingStub) Mockito.doAnswer(invocationOnMock -> {
                checkIncominMsgAndRefCounts(z, invocationOnMock, Message.MessageType.VALUE_MESSAGE_TYPE_BROADCAST_SUBSCRIPTION_REQUEST);
                SuccessAction successAction = (SuccessAction) invocationOnMock.getArguments()[1];
                invocationOnMock.callRealMethod();
                successAction.execute();
                checkRefCnt("provider-1", 1L);
                checkRefCnt(this.proxyParticipantId, z ? 3L : 2L);
                countDownLatch.countDown();
                return null;
            }).when(inProcessMessagingStub)).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
            return inProcessMessagingStub;
        }).when(this.inProcessMessagingStubFactorySpy)).create((InProcessAddress) ArgumentMatchers.any(InProcessAddress.class));
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        ((IMessagingStub) Mockito.doAnswer(invocationOnMock2 -> {
            ImmutableMessage immutableMessage = (ImmutableMessage) invocationOnMock2.getArguments()[0];
            Assert.assertEquals(this.proxyParticipantId, immutableMessage.getRecipient());
            Assert.assertEquals(Message.MessageType.VALUE_MESSAGE_TYPE_SUBSCRIPTION_REPLY, immutableMessage.getType());
            checkRefCnt("provider-1", 1L);
            checkRefCnt(this.proxyParticipantId, z ? 3L : 2L);
            ((FailureAction) invocationOnMock2.getArguments()[2]).execute(new JoynrDelayMessageException((immutableMessage.getTtlMs() - System.currentTimeMillis()) + 1, "test expired reply in MessageWorker"));
            checkRefCnt("provider-1", 1L);
            checkRefCnt(this.proxyParticipantId, z ? 3L : 2L);
            countDownLatch2.countDown();
            return null;
        }).when(iMessagingStub)).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
        fakeIncomingSrq(throwingConsumer, JoynrUtil.createUuidString(), 512L);
        waitFor(countDownLatch, DEFAULT_WAIT_TIME);
        waitFor(countDownLatch2, DEFAULT_WAIT_TIME);
        sleep(612L);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 1L);
        verifyOutgoing(iMessagingStub, Message.MessageType.VALUE_MESSAGE_TYPE_SUBSCRIPTION_REPLY, 1);
        this.testProvider.fireIntBroadcast(44);
        sleep(200L);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 1L);
        Mockito.verifyNoMoreInteractions(new Object[]{iMessagingStub});
    }

    @Test
    public void mqtt_srqSrp_error_srpExpired() {
        srqSrp_error_srpExpired(this.replyToAddress, this.mqttMessagingStubMock, true, this.insertMqttMessage);
    }

    @Test
    public void ws_srqSrp_error_srpExpired() {
        srqSrp_error_srpExpired(this.wsClientAddress, this.webSocketClientMessagingStubMock, false, this.insertWsMessage);
    }

    private void oneWay_success(Address address, ThrowingConsumer<MutableMessage> throwingConsumer) {
        createProxyRoutingEntry(address);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((InProcessMessagingStubFactory) Mockito.doAnswer(invocationOnMock -> {
            InProcessMessagingStub inProcessMessagingStub = (InProcessMessagingStub) Mockito.spy((InProcessMessagingStub) invocationOnMock.callRealMethod());
            ((InProcessMessagingStub) Mockito.doAnswer(invocationOnMock -> {
                checkIncominMsgAndRefCounts(false, invocationOnMock, Message.MessageType.VALUE_MESSAGE_TYPE_ONE_WAY);
                SuccessAction successAction = (SuccessAction) invocationOnMock.getArguments()[1];
                invocationOnMock.callRealMethod();
                successAction.execute();
                checkRefCnt("provider-1", 1L);
                checkRefCnt(this.proxyParticipantId, 1L);
                countDownLatch.countDown();
                return null;
            }).when(inProcessMessagingStub)).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
            return inProcessMessagingStub;
        }).when(this.inProcessMessagingStubFactorySpy)).create((InProcessAddress) ArgumentMatchers.any(InProcessAddress.class));
        MutableMessage createOneWayRequest = this.messageFactory.createOneWayRequest(this.proxyParticipantId, "provider-1", new OneWayRequest("methodFireAndForgetWithoutParams", new Object[0], new Class[0]), this.defaultMessagingQos);
        createOneWayRequest.setReplyTo(RoutingTypesUtil.toAddressString(this.replyToAddress));
        try {
            throwingConsumer.accept(createOneWayRequest);
        } catch (Exception e) {
            Assert.fail("fake incoming one way request failed: " + e);
        }
        waitFor(countDownLatch, DEFAULT_WAIT_TIME);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 1L);
    }

    @Test
    public void mqtt_oneWay_success() {
        oneWay_success(this.replyToAddress, this.insertMqttMessage);
    }

    @Test
    public void ws_oneWay_success() {
        oneWay_success(this.wsClientAddress, this.insertWsMessage);
    }

    private void mrqSrp_success(Address address, IMessagingStub iMessagingStub, boolean z, ThrowingConsumer<MutableMessage> throwingConsumer) {
        createProxyRoutingEntry(address);
        String createMulticastId = MulticastIdUtil.createMulticastId("provider-1", "emptyBroadcast", new String[0]);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((InProcessMessagingStubFactory) Mockito.doAnswer(invocationOnMock -> {
            InProcessMessagingStub inProcessMessagingStub = (InProcessMessagingStub) Mockito.spy((InProcessMessagingStub) invocationOnMock.callRealMethod());
            ((InProcessMessagingStub) Mockito.doAnswer(invocationOnMock -> {
                checkIncominMsgAndRefCounts(z, invocationOnMock, Message.MessageType.VALUE_MESSAGE_TYPE_MULTICAST_SUBSCRIPTION_REQUEST);
                SuccessAction successAction = (SuccessAction) invocationOnMock.getArguments()[1];
                invocationOnMock.callRealMethod();
                successAction.execute();
                checkRefCnt("provider-1", 1L);
                checkRefCnt(this.proxyParticipantId, z ? 2L : 1L);
                countDownLatch.countDown();
                return null;
            }).when(inProcessMessagingStub)).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
            return inProcessMessagingStub;
        }).when(this.inProcessMessagingStubFactorySpy)).create((InProcessAddress) ArgumentMatchers.any(InProcessAddress.class));
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(2 * (z ? this.gbids.length : 1));
        ((IMessagingStub) Mockito.doAnswer(invocationOnMock2 -> {
            ImmutableMessage immutableMessage = (ImmutableMessage) invocationOnMock2.getArguments()[0];
            if (Message.MessageType.VALUE_MESSAGE_TYPE_MULTICAST.equals(immutableMessage.getType())) {
                Assert.assertEquals(createMulticastId, immutableMessage.getRecipient());
                checkRefCnt("provider-1", 1L);
                checkRefCnt(this.proxyParticipantId, 1L);
                countDownLatch3.countDown();
                return null;
            }
            Assert.assertEquals(this.proxyParticipantId, immutableMessage.getRecipient());
            Assert.assertEquals(Message.MessageType.VALUE_MESSAGE_TYPE_SUBSCRIPTION_REPLY, immutableMessage.getType());
            checkRefCnt("provider-1", 1L);
            checkRefCnt(this.proxyParticipantId, z ? 2L : 1L);
            ((SuccessAction) invocationOnMock2.getArguments()[1]).execute();
            checkRefCnt("provider-1", 1L);
            checkRefCnt(this.proxyParticipantId, 1L);
            countDownLatch2.countDown();
            return null;
        }).when(iMessagingStub)).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
        String createUuidString = JoynrUtil.createUuidString();
        MutableMessage createSubscriptionRequest = this.messageFactory.createSubscriptionRequest(this.proxyParticipantId, "provider-1", new MulticastSubscriptionRequest(createMulticastId, createUuidString, "emptyBroadcast", new OnChangeSubscriptionQos().setMinIntervalMs(0L).setValidityMs(5000L)), this.defaultMessagingQos);
        createSubscriptionRequest.setReplyTo(RoutingTypesUtil.toAddressString(this.replyToAddress));
        try {
            throwingConsumer.accept(createSubscriptionRequest);
        } catch (Exception e) {
            Assert.fail("fake incoming subscription request failed: " + e);
        }
        waitFor(countDownLatch, DEFAULT_WAIT_TIME);
        waitFor(countDownLatch2, DEFAULT_WAIT_TIME);
        verifyOutgoing(iMessagingStub, Message.MessageType.VALUE_MESSAGE_TYPE_SUBSCRIPTION_REPLY, 1);
        sleep(256L);
        this.testProvider.fireEmptyBroadcast(new String[0]);
        this.testProvider.fireEmptyBroadcast(new String[0]);
        try {
            Assert.assertTrue(countDownLatch3.await(DEFAULT_WAIT_TIME, TimeUnit.MILLISECONDS));
        } catch (InterruptedException e2) {
            Assert.fail("Wait for publication failed: " + e2);
        }
        verifyOutgoing(iMessagingStub, Message.MessageType.VALUE_MESSAGE_TYPE_MULTICAST, 2 * (z ? this.gbids.length : 1));
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 1L);
        waitFor(fakeIncomingSst(throwingConsumer, createUuidString, false), DEFAULT_WAIT_TIME);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 1L);
        Mockito.verifyNoMoreInteractions(new Object[]{iMessagingStub});
    }

    @Test
    public void mqtt_mrqSrp_success() {
        mrqSrp_success(this.replyToAddress, this.mqttMessagingStubMock, true, this.insertMqttMessage);
    }

    @Test
    public void ws_mrqSrp_success() {
        String createMulticastId = MulticastIdUtil.createMulticastId("provider-1", "emptyBroadcast", new String[0]);
        MulticastReceiverRegistrar multicastReceiverRegistrar = (MulticastReceiverRegistrar) this.injector.getInstance(MulticastReceiverRegistrar.class);
        multicastReceiverRegistrar.addMulticastReceiver(createMulticastId, this.proxyParticipantId, "provider-1");
        mrqSrp_success(this.wsClientAddress, this.webSocketClientMessagingStubMock, false, this.insertWsMessage);
        multicastReceiverRegistrar.removeMulticastReceiver(createMulticastId, this.proxyParticipantId, "provider-1");
        verifyOutgoing(this.mqttMessagingStubMock, Message.MessageType.VALUE_MESSAGE_TYPE_MULTICAST, 2 * this.gbids.length);
    }

    private void attributeSrqSrp_success(Address address, IMessagingStub iMessagingStub, boolean z, ThrowingConsumer<MutableMessage> throwingConsumer) {
        createProxyRoutingEntry(address);
        this.testProvider.setEnumAttribute(TestEnum.ONE);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((InProcessMessagingStubFactory) Mockito.doAnswer(invocationOnMock -> {
            InProcessMessagingStub inProcessMessagingStub = (InProcessMessagingStub) Mockito.spy((InProcessMessagingStub) invocationOnMock.callRealMethod());
            ((InProcessMessagingStub) Mockito.doAnswer(invocationOnMock -> {
                checkIncominMsgAndRefCounts(z, invocationOnMock, Message.MessageType.VALUE_MESSAGE_TYPE_SUBSCRIPTION_REQUEST);
                SuccessAction successAction = (SuccessAction) invocationOnMock.getArguments()[1];
                invocationOnMock.callRealMethod();
                successAction.execute();
                checkRefCnt("provider-1", 1L);
                checkRefCnt(this.proxyParticipantId, z ? 3L : 2L);
                countDownLatch.countDown();
                return null;
            }).when(inProcessMessagingStub)).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
            return inProcessMessagingStub;
        }).when(this.inProcessMessagingStubFactorySpy)).create((InProcessAddress) ArgumentMatchers.any(InProcessAddress.class));
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(2);
        ((IMessagingStub) Mockito.doAnswer(invocationOnMock2 -> {
            ImmutableMessage immutableMessage = (ImmutableMessage) invocationOnMock2.getArguments()[0];
            Assert.assertEquals(this.proxyParticipantId, immutableMessage.getRecipient());
            if (Message.MessageType.VALUE_MESSAGE_TYPE_PUBLICATION.equals(immutableMessage.getType())) {
                checkRefCnt("provider-1", 1L);
                countDownLatch3.countDown();
                return null;
            }
            Assert.assertEquals(Message.MessageType.VALUE_MESSAGE_TYPE_SUBSCRIPTION_REPLY, immutableMessage.getType());
            checkRefCnt("provider-1", 1L);
            checkRefCnt(this.proxyParticipantId, z ? 3L : 2L);
            ((SuccessAction) invocationOnMock2.getArguments()[1]).execute();
            checkRefCnt("provider-1", 1L);
            checkRefCnt(this.proxyParticipantId, 2L);
            countDownLatch2.countDown();
            return null;
        }).when(iMessagingStub)).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
        String createUuidString = JoynrUtil.createUuidString();
        MutableMessage createSubscriptionRequest = this.messageFactory.createSubscriptionRequest(this.proxyParticipantId, "provider-1", new SubscriptionRequest(createUuidString, "enumAttribute", new OnChangeSubscriptionQos().setMinIntervalMs(0L).setValidityMs(5000L)), this.defaultMessagingQos);
        createSubscriptionRequest.setReplyTo(RoutingTypesUtil.toAddressString(this.replyToAddress));
        try {
            throwingConsumer.accept(createSubscriptionRequest);
        } catch (Exception e) {
            Assert.fail("fake incoming subscription request failed: " + e);
        }
        waitFor(countDownLatch, DEFAULT_WAIT_TIME);
        waitFor(countDownLatch2, DEFAULT_WAIT_TIME);
        verifyOutgoing(iMessagingStub, Message.MessageType.VALUE_MESSAGE_TYPE_SUBSCRIPTION_REPLY, 1);
        this.testProvider.enumAttributeChanged(TestEnum.TWO);
        try {
            Assert.assertTrue(countDownLatch3.await(DEFAULT_WAIT_TIME, TimeUnit.MILLISECONDS));
        } catch (Throwable th) {
            Assert.fail("Wait for publication failed: " + th);
        }
        verifyOutgoing(iMessagingStub, Message.MessageType.VALUE_MESSAGE_TYPE_PUBLICATION, 2);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 2L);
        waitFor(fakeIncomingSst(throwingConsumer, createUuidString, true), DEFAULT_WAIT_TIME);
        this.testProvider.enumAttributeChanged(TestEnum.ZERO);
        sleep(200L);
        checkRefCnt("provider-1", 1L);
        checkRefCnt(this.proxyParticipantId, 1L);
        Mockito.verifyNoMoreInteractions(new Object[]{iMessagingStub});
    }

    @Test
    public void mqtt_attributeSrqSrp_success() {
        attributeSrqSrp_success(this.replyToAddress, this.mqttMessagingStubMock, true, this.insertMqttMessage);
    }

    @Test
    public void ws_attributeSrqSrp_success() {
        attributeSrqSrp_success(this.wsClientAddress, this.webSocketClientMessagingStubMock, false, this.insertWsMessage);
    }

    @Test
    public void mqtt_statelessReply_success() throws NoSuchMethodException, SecurityException {
        createProxyRoutingEntry(this.replyToAddress);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        StatelessAsyncCallback statelessAsyncCallback = new StatelessAsyncCallback(countDownLatch);
        this.joynrRuntime.registerStatelessAsyncCallback(statelessAsyncCallback);
        StatelessAsyncIdCalculator statelessAsyncIdCalculator = (StatelessAsyncIdCalculator) this.injector.getInstance(StatelessAsyncIdCalculator.class);
        String calculateParticipantId = statelessAsyncIdCalculator.calculateParticipantId(ProxyErrorsTest.TestProxyWrongVersion.INTERFACE_NAME, statelessAsyncCallback);
        checkRefCnt(calculateParticipantId, 1L);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        ((InProcessMessagingStubFactory) Mockito.doAnswer(invocationOnMock -> {
            InProcessMessagingStub inProcessMessagingStub = (InProcessMessagingStub) Mockito.spy((InProcessMessagingStub) invocationOnMock.callRealMethod());
            ((InProcessMessagingStub) Mockito.doAnswer(invocationOnMock -> {
                ImmutableMessage immutableMessage = (ImmutableMessage) invocationOnMock.getArguments()[0];
                Assert.assertEquals(Message.MessageType.VALUE_MESSAGE_TYPE_REPLY, immutableMessage.getType());
                Assert.assertEquals(calculateParticipantId, immutableMessage.getRecipient());
                checkRefCnt(calculateParticipantId, 1L);
                SuccessAction successAction = (SuccessAction) invocationOnMock.getArguments()[1];
                invocationOnMock.callRealMethod();
                successAction.execute();
                countDownLatch2.countDown();
                return null;
            }).when(inProcessMessagingStub)).transmit((ImmutableMessage) ArgumentMatchers.any(ImmutableMessage.class), (SuccessAction) ArgumentMatchers.any(SuccessAction.class), (FailureAction) ArgumentMatchers.any(FailureAction.class));
            return inProcessMessagingStub;
        }).when(this.inProcessMessagingStubFactorySpy)).create((InProcessAddress) ArgumentMatchers.any(InProcessAddress.class));
        try {
            this.insertMqttMessage.accept(this.messageFactory.createReply("provider-1", calculateParticipantId, new Reply(statelessAsyncIdCalculator.calculateStatelessCallbackRequestReplyId(testProxy.class.getMethod("sayHello", MessageIdCallback.class)), new Object[]{"result"}), new MessagingQos()));
        } catch (Exception e) {
            Assert.fail("fake incoming stateless reply failed: " + e);
        }
        waitFor(countDownLatch2, DEFAULT_WAIT_TIME);
        waitFor(countDownLatch, DEFAULT_WAIT_TIME);
        checkRefCnt(calculateParticipantId, 1L);
    }
}
