package org.apache.pulsar.client.api;

import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javassist.bytecode.Opcode;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/api/ClientErrorsTest.class */
public class ClientErrorsTest {
    MockBrokerService mockBrokerService;
    private static final int WEB_SERVICE_PORT = PortManager.nextFreePort();
    private static final int WEB_SERVICE_TLS_PORT = PortManager.nextFreePort();
    private static final int BROKER_SERVICE_PORT = PortManager.nextFreePort();
    private static final int BROKER_SERVICE_TLS_PORT = PortManager.nextFreePort();
    private static int ASYNC_EVENT_COMPLETION_WAIT = 100;
    private final String ASSERTION_ERROR = "AssertionError";

    @BeforeClass
    public void setup() {
        this.mockBrokerService = new MockBrokerService(WEB_SERVICE_PORT, WEB_SERVICE_TLS_PORT, BROKER_SERVICE_PORT, BROKER_SERVICE_TLS_PORT);
        this.mockBrokerService.start();
    }

    @AfterClass
    public void teardown() {
        this.mockBrokerService.stop();
    }

    @Test
    public void testMockBrokerService() throws Exception {
        try {
            PulsarClient build = PulsarClient.builder().serviceUrl("http://127.0.0.1:" + WEB_SERVICE_PORT).build();
            Consumer<byte[]> subscribe = build.newConsumer().topic("persistent://prop/use/ns/t1").subscriptionName("sub1").subscribe();
            Producer<byte[]> create = build.newProducer().topic("persistent://prop/use/ns/t1").create();
            Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
            create.send("message".getBytes());
            Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
            subscribe.unsubscribe();
            create.close();
            subscribe.close();
            build.close();
        } catch (Exception e) {
            Assert.fail("None of the mocked operations should throw a client side exception");
        }
    }

    @Test
    public void testProducerCreateFailWithoutRetry() throws Exception {
        producerCreateFailWithoutRetry("persistent://prop/use/ns/t1");
    }

    @Test
    public void testPartitionedProducerCreateFailWithoutRetry() throws Exception {
        producerCreateFailWithoutRetry("persistent://prop/use/ns/part-t1");
    }

    private void producerCreateFailWithoutRetry(String str) throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl("http://127.0.0.1:" + WEB_SERVICE_PORT).build();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.mockBrokerService.setHandleProducer((channelHandlerContext, commandProducer) -> {
            if (atomicInteger.incrementAndGet() == 2) {
                channelHandlerContext.writeAndFlush(Commands.newError(commandProducer.getRequestId(), PulsarApi.ServerError.UnknownError, "AssertionError"));
            } else {
                channelHandlerContext.writeAndFlush(Commands.newError(commandProducer.getRequestId(), PulsarApi.ServerError.AuthorizationError, "msg"));
            }
        });
        try {
            build.newProducer().topic(str).create();
        } catch (Exception e) {
            if (e.getMessage().equals("AssertionError")) {
                Assert.fail("Producer create should not retry on auth error");
            }
            Assert.assertTrue(e instanceof PulsarClientException.AuthorizationException);
        }
        this.mockBrokerService.resetHandleProducer();
        build.close();
    }

    @Test
    public void testProducerCreateSuccessAfterRetry() throws Exception {
        producerCreateSuccessAfterRetry("persistent://prop/use/ns/t1");
    }

    @Test
    public void testPartitionedProducerCreateSuccessAfterRetry() throws Exception {
        producerCreateSuccessAfterRetry("persistent://prop/use/ns/part-t1");
    }

    private void producerCreateSuccessAfterRetry(String str) throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl("http://127.0.0.1:" + WEB_SERVICE_PORT).build();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.mockBrokerService.setHandleProducer((channelHandlerContext, commandProducer) -> {
            if (atomicInteger.incrementAndGet() == 2) {
                channelHandlerContext.writeAndFlush(Commands.newProducerSuccess(commandProducer.getRequestId(), "default-producer", SchemaVersion.Empty));
            } else {
                channelHandlerContext.writeAndFlush(Commands.newError(commandProducer.getRequestId(), PulsarApi.ServerError.ServiceNotReady, "msg"));
            }
        });
        try {
            build.newProducer().topic(str).create();
        } catch (Exception e) {
            Assert.fail("Should not fail");
        }
        this.mockBrokerService.resetHandleProducer();
        build.close();
    }

    @Test
    public void testProducerCreateFailAfterRetryTimeout() throws Exception {
        producerCreateFailAfterRetryTimeout("persistent://prop/use/ns/t1");
    }

    @Test
    public void testPartitionedProducerCreateFailAfterRetryTimeout() throws Exception {
        producerCreateFailAfterRetryTimeout("persistent://prop/use/ns/part-t1");
    }

    private void producerCreateFailAfterRetryTimeout(String str) throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl("http://127.0.0.1:" + WEB_SERVICE_PORT).operationTimeout(1, TimeUnit.SECONDS).build();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.mockBrokerService.setHandleProducer((channelHandlerContext, commandProducer) -> {
            if (atomicInteger.incrementAndGet() == 2) {
                try {
                    Thread.sleep(2000L);
                } catch (InterruptedException e) {
                }
            }
            channelHandlerContext.writeAndFlush(Commands.newError(commandProducer.getRequestId(), PulsarApi.ServerError.ServiceNotReady, "msg"));
        });
        try {
            build.newProducer().topic(str).create();
            Assert.fail("Should have failed");
        } catch (Exception e) {
            Assert.assertTrue(e instanceof PulsarClientException);
        }
        this.mockBrokerService.resetHandleProducer();
        build.close();
    }

    @Test
    public void testProducerFailDoesNotFailOtherProducer() throws Exception {
        producerFailDoesNotFailOtherProducer("persistent://prop/use/ns/t1", "persistent://prop/use/ns/t2");
    }

    @Test
    public void testPartitionedProducerFailDoesNotFailOtherProducer() throws Exception {
        producerFailDoesNotFailOtherProducer("persistent://prop/use/ns/part-t1", "persistent://prop/use/ns/part-t2");
    }

    private void producerFailDoesNotFailOtherProducer(String str, String str2) throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl("http://127.0.0.1:" + WEB_SERVICE_PORT).build();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.mockBrokerService.setHandleProducer((channelHandlerContext, commandProducer) -> {
            if (atomicInteger.incrementAndGet() == 2) {
                channelHandlerContext.writeAndFlush(Commands.newError(commandProducer.getRequestId(), PulsarApi.ServerError.AuthenticationError, "msg"));
            } else {
                channelHandlerContext.writeAndFlush(Commands.newProducerSuccess(commandProducer.getRequestId(), "default-producer", SchemaVersion.Empty));
            }
        });
        ProducerBase create = build.newProducer().topic(str).create();
        ProducerBase producerBase = null;
        try {
            producerBase = (ProducerBase) build.newProducer().topic(str2).create();
            Assert.fail("Should have failed");
        } catch (Exception e) {
        }
        Assert.assertTrue(create.isConnected());
        Assert.assertFalse(producerBase != null && producerBase.isConnected());
        this.mockBrokerService.resetHandleProducer();
        build.close();
    }

    @Test
    public void testProducerContinuousRetryAfterSendFail() throws Exception {
        producerContinuousRetryAfterSendFail("persistent://prop/use/ns/t1");
    }

    @Test
    public void testPartitionedProducerContinuousRetryAfterSendFail() throws Exception {
        producerContinuousRetryAfterSendFail("persistent://prop/use/ns/part-t1");
    }

    private void producerContinuousRetryAfterSendFail(String str) throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl("http://127.0.0.1:" + WEB_SERVICE_PORT).build();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.mockBrokerService.setHandleProducer((channelHandlerContext, commandProducer) -> {
            int incrementAndGet = atomicInteger.incrementAndGet();
            if (incrementAndGet == 1 || incrementAndGet == 5) {
                channelHandlerContext.writeAndFlush(Commands.newProducerSuccess(commandProducer.getRequestId(), "default-producer", SchemaVersion.Empty));
            } else {
                channelHandlerContext.writeAndFlush(Commands.newError(commandProducer.getRequestId(), PulsarApi.ServerError.PersistenceError, "msg"));
            }
        });
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        this.mockBrokerService.setHandleSend((channelHandlerContext2, commandSend, byteBuf) -> {
            if (atomicInteger2.incrementAndGet() == 1) {
                channelHandlerContext2.writeAndFlush(Commands.newSendError(0L, 0L, PulsarApi.ServerError.PersistenceError, "Send Failed"));
            } else {
                channelHandlerContext2.writeAndFlush(Commands.newSendReceipt(0L, 0L, 1L, 1L));
            }
        });
        try {
            build.newProducer().topic(str).create().send("message".getBytes());
        } catch (Exception e) {
            Assert.fail("Should not fail");
        }
        this.mockBrokerService.resetHandleProducer();
        this.mockBrokerService.resetHandleSend();
        build.close();
    }

    @Test
    public void testSubscribeFailWithoutRetry() throws Exception {
        subscribeFailWithoutRetry("persistent://prop/use/ns/t1");
    }

    @Test
    public void testPartitionedSubscribeFailWithoutRetry() throws Exception {
        subscribeFailWithoutRetry("persistent://prop/use/ns/part-t1");
    }

    @Test
    public void testLookupWithDisconnection() throws Exception {
        String str = "pulsar://127.0.0.1:" + BROKER_SERVICE_PORT;
        PulsarClient build = PulsarClient.builder().serviceUrl(str).build();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.mockBrokerService.setHandlePartitionLookup((channelHandlerContext, commandPartitionedTopicMetadata) -> {
            channelHandlerContext.writeAndFlush(Commands.newPartitionMetadataResponse(0, commandPartitionedTopicMetadata.getRequestId()));
        });
        this.mockBrokerService.setHandleLookup((channelHandlerContext2, commandLookupTopic) -> {
            if (atomicInteger.incrementAndGet() == 1) {
                channelHandlerContext2.close();
            } else {
                channelHandlerContext2.writeAndFlush(Commands.newLookupResponse(str, (String) null, true, PulsarApi.CommandLookupTopicResponse.LookupType.Connect, commandLookupTopic.getRequestId(), false));
            }
        });
        try {
            build.newConsumer().topic("persistent://prop/use/ns/t1").subscriptionName("sub1").subscribe();
        } catch (Exception e) {
            if (e.getMessage().equals("AssertionError")) {
                Assert.fail("Subscribe should not retry on persistence error");
            }
            Assert.assertTrue(e instanceof PulsarClientException.BrokerPersistenceException);
        }
        this.mockBrokerService.resetHandlePartitionLookup();
        this.mockBrokerService.resetHandleLookup();
        build.close();
    }

    private void subscribeFailWithoutRetry(String str) throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl("http://127.0.0.1:" + WEB_SERVICE_PORT).build();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.mockBrokerService.setHandleSubscribe((channelHandlerContext, commandSubscribe) -> {
            if (atomicInteger.incrementAndGet() == 2) {
                channelHandlerContext.writeAndFlush(Commands.newError(commandSubscribe.getRequestId(), PulsarApi.ServerError.UnknownError, "AssertionError"));
            } else {
                channelHandlerContext.writeAndFlush(Commands.newError(commandSubscribe.getRequestId(), PulsarApi.ServerError.PersistenceError, "msg"));
            }
        });
        try {
            build.newConsumer().topic(str).subscriptionName("sub1").subscribe();
        } catch (Exception e) {
            if (e.getMessage().equals("AssertionError")) {
                Assert.fail("Subscribe should not retry on persistence error");
            }
            Assert.assertTrue(e instanceof PulsarClientException.BrokerPersistenceException);
        }
        this.mockBrokerService.resetHandleSubscribe();
        build.close();
    }

    @Test
    public void testSubscribeSuccessAfterRetry() throws Exception {
        subscribeSuccessAfterRetry("persistent://prop/use/ns/t1");
    }

    @Test
    public void testPartitionedSubscribeSuccessAfterRetry() throws Exception {
        subscribeSuccessAfterRetry("persistent://prop/use/ns/part-t1");
    }

    private void subscribeSuccessAfterRetry(String str) throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl("http://127.0.0.1:" + WEB_SERVICE_PORT).build();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.mockBrokerService.setHandleSubscribe((channelHandlerContext, commandSubscribe) -> {
            if (atomicInteger.incrementAndGet() == 2) {
                channelHandlerContext.writeAndFlush(Commands.newSuccess(commandSubscribe.getRequestId()));
            } else {
                channelHandlerContext.writeAndFlush(Commands.newError(commandSubscribe.getRequestId(), PulsarApi.ServerError.ServiceNotReady, "msg"));
            }
        });
        try {
            build.newConsumer().topic(str).subscriptionName("sub1").subscribe();
        } catch (Exception e) {
            Assert.fail("Should not fail");
        }
        this.mockBrokerService.resetHandleSubscribe();
        build.close();
    }

    @Test
    public void testSubscribeFailAfterRetryTimeout() throws Exception {
        subscribeFailAfterRetryTimeout("persistent://prop/use/ns/t1");
    }

    @Test
    public void testPartitionedSubscribeFailAfterRetryTimeout() throws Exception {
        subscribeFailAfterRetryTimeout("persistent://prop/use/ns/part-t1");
    }

    private void subscribeFailAfterRetryTimeout(String str) throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl("http://127.0.0.1:" + WEB_SERVICE_PORT).operationTimeout(Opcode.GOTO_W, TimeUnit.MILLISECONDS).build();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.mockBrokerService.setHandleSubscribe((channelHandlerContext, commandSubscribe) -> {
            if (atomicInteger.incrementAndGet() == 2) {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                }
            }
            channelHandlerContext.writeAndFlush(Commands.newError(commandSubscribe.getRequestId(), PulsarApi.ServerError.ServiceNotReady, "msg"));
        });
        try {
            build.newConsumer().topic(str).subscriptionName("sub1").subscribe();
            Assert.fail("Should have failed");
        } catch (Exception e) {
            Assert.assertTrue(e instanceof PulsarClientException);
        }
        this.mockBrokerService.resetHandleSubscribe();
        build.close();
    }

    @Test
    public void testSubscribeFailDoesNotFailOtherConsumer() throws Exception {
        subscribeFailDoesNotFailOtherConsumer("persistent://prop/use/ns/t1", "persistent://prop/use/ns/t2");
    }

    @Test
    public void testPartitionedSubscribeFailDoesNotFailOtherConsumer() throws Exception {
        subscribeFailDoesNotFailOtherConsumer("persistent://prop/use/ns/part-t1", "persistent://prop/use/ns/part-t2");
    }

    private void subscribeFailDoesNotFailOtherConsumer(String str, String str2) throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl("http://127.0.0.1:" + WEB_SERVICE_PORT).build();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.mockBrokerService.setHandleSubscribe((channelHandlerContext, commandSubscribe) -> {
            if (atomicInteger.incrementAndGet() == 2) {
                channelHandlerContext.writeAndFlush(Commands.newError(commandSubscribe.getRequestId(), PulsarApi.ServerError.AuthenticationError, "msg"));
            } else {
                channelHandlerContext.writeAndFlush(Commands.newSuccess(commandSubscribe.getRequestId()));
            }
        });
        ConsumerBase subscribe = build.newConsumer().topic(str).subscriptionName("sub1").subscribe();
        ConsumerBase consumerBase = null;
        try {
            consumerBase = (ConsumerBase) build.newConsumer().topic(str2).subscriptionName("sub1").subscribe();
            Assert.fail("Should have failed");
        } catch (Exception e) {
        }
        Assert.assertTrue(subscribe.isConnected());
        Assert.assertFalse(consumerBase != null && consumerBase.isConnected());
        this.mockBrokerService.resetHandleSubscribe();
        build.close();
    }

    @Test
    public void testOneProducerFailShouldCloseAllProducersInPartitionedProducer() throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl("http://127.0.0.1:" + WEB_SERVICE_PORT).build();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        this.mockBrokerService.setHandleProducer((channelHandlerContext, commandProducer) -> {
            if (atomicInteger.incrementAndGet() == 3) {
                channelHandlerContext.writeAndFlush(Commands.newError(commandProducer.getRequestId(), PulsarApi.ServerError.AuthorizationError, "msg"));
            } else {
                channelHandlerContext.writeAndFlush(Commands.newProducerSuccess(commandProducer.getRequestId(), "default-producer", SchemaVersion.Empty));
            }
        });
        this.mockBrokerService.setHandleCloseProducer((channelHandlerContext2, commandCloseProducer) -> {
            channelHandlerContext2.writeAndFlush(Commands.newSuccess(commandCloseProducer.getRequestId()));
            atomicInteger2.incrementAndGet();
        });
        try {
            build.newProducer().topic("persistent://prop/use/ns/multi-part-t1").create();
            Assert.fail("Should have failed with an authorization error");
        } catch (Exception e) {
            Assert.assertTrue(e instanceof PulsarClientException.AuthorizationException);
            Assert.assertEquals(atomicInteger2.get(), 3);
        }
        this.mockBrokerService.resetHandleProducer();
        this.mockBrokerService.resetHandleCloseProducer();
        build.close();
    }

    @Test
    public void testOneConsumerFailShouldCloseAllConsumersInPartitionedConsumer() throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl("http://127.0.0.1:" + WEB_SERVICE_PORT).build();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        this.mockBrokerService.setHandleSubscribe((channelHandlerContext, commandSubscribe) -> {
            if (atomicInteger.incrementAndGet() == 3) {
                channelHandlerContext.writeAndFlush(Commands.newError(commandSubscribe.getRequestId(), PulsarApi.ServerError.AuthenticationError, "msg"));
            } else {
                channelHandlerContext.writeAndFlush(Commands.newSuccess(commandSubscribe.getRequestId()));
            }
        });
        this.mockBrokerService.setHandleCloseConsumer((channelHandlerContext2, commandCloseConsumer) -> {
            channelHandlerContext2.writeAndFlush(Commands.newSuccess(commandCloseConsumer.getRequestId()));
            atomicInteger2.incrementAndGet();
        });
        try {
            build.newConsumer().topic("persistent://prop/use/ns/multi-part-t1").subscriptionName("sub1").subscribe();
            Assert.fail("Should have failed with an authentication error");
        } catch (Exception e) {
            Assert.assertTrue(e instanceof PulsarClientException.AuthenticationException);
            Assert.assertEquals(atomicInteger2.get(), 3);
        }
        this.mockBrokerService.resetHandleSubscribe();
        this.mockBrokerService.resetHandleCloseConsumer();
        build.close();
    }

    @Test
    public void testFlowSendWhenPartitionedSubscribeCompletes() throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl("http://127.0.0.1:" + WEB_SERVICE_PORT).build();
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.mockBrokerService.setHandleSubscribe((channelHandlerContext, commandSubscribe) -> {
            atomicInteger.incrementAndGet();
            channelHandlerContext.writeAndFlush(Commands.newSuccess(commandSubscribe.getRequestId()));
        });
        this.mockBrokerService.setHandleFlow((channelHandlerContext2, commandFlow) -> {
            if (atomicInteger.get() != 4) {
                atomicBoolean.set(true);
            }
        });
        build.newConsumer().topic("persistent://prop/use/ns/multi-part-t1").subscriptionName("sub1").subscribe();
        if (atomicBoolean.get()) {
            Assert.fail("Flow command should have been sent after all 4 partitions subscribe successfully");
        }
        this.mockBrokerService.resetHandleSubscribe();
        this.mockBrokerService.resetHandleFlow();
        build.close();
    }

    @Test(invocationCount = 10)
    public void testProducerReconnect() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicReference atomicReference = new AtomicReference();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.mockBrokerService.setHandleConnect((channelHandlerContext, commandConnect) -> {
            atomicReference.set(channelHandlerContext);
            channelHandlerContext.writeAndFlush(Commands.newConnected(commandConnect.getProtocolVersion()));
            if (atomicInteger.incrementAndGet() == 2) {
                channelHandlerContext.channel().close();
            }
        });
        this.mockBrokerService.setHandleProducer((channelHandlerContext2, commandProducer) -> {
            channelHandlerContext2.writeAndFlush(Commands.newProducerSuccess(commandProducer.getRequestId(), "default-producer", SchemaVersion.Empty));
        });
        this.mockBrokerService.setHandleSend((channelHandlerContext3, commandSend, byteBuf) -> {
            atomicBoolean.set(true);
            channelHandlerContext3.writeAndFlush(Commands.newSendReceipt(0L, 0L, 1L, 1L));
        });
        PulsarClient build = PulsarClient.builder().serviceUrl("http://127.0.0.1:" + WEB_SERVICE_PORT).build();
        Producer<byte[]> create = build.newProducer().topic("persistent://prop/use/ns/t1").create();
        ((ChannelHandlerContext) atomicReference.get()).channel().close().get();
        create.send(new byte[0]);
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertTrue(atomicInteger.get() >= 3);
        this.mockBrokerService.resetHandleConnect();
        this.mockBrokerService.resetHandleProducer();
        this.mockBrokerService.resetHandleSend();
        build.close();
    }

    @Test
    public void testConsumerReconnect() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicReference atomicReference = new AtomicReference();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.mockBrokerService.setHandleConnect((channelHandlerContext, commandConnect) -> {
            atomicReference.set(channelHandlerContext);
            channelHandlerContext.writeAndFlush(Commands.newConnected(commandConnect.getProtocolVersion()));
            if (atomicInteger.incrementAndGet() == 2) {
                channelHandlerContext.channel().close();
            }
            if (atomicInteger.get() == 3) {
                countDownLatch.countDown();
            }
        });
        this.mockBrokerService.setHandleSubscribe((channelHandlerContext2, commandSubscribe) -> {
            channelHandlerContext2.writeAndFlush(Commands.newSuccess(commandSubscribe.getRequestId()));
        });
        PulsarClient build = PulsarClient.builder().serviceUrl("http://127.0.0.1:" + WEB_SERVICE_PORT).build();
        build.newConsumer().topic("persistent://prop/use/ns/t1").subscriptionName("sub1").subscribe();
        ((ChannelHandlerContext) atomicReference.get()).channel().close();
        countDownLatch.await(5L, TimeUnit.SECONDS);
        Assert.assertEquals(atomicInteger.get(), 3);
        this.mockBrokerService.resetHandleConnect();
        this.mockBrokerService.resetHandleSubscribe();
        build.close();
    }
}
