package org.apache.pulsar.client.impl;

import io.netty.buffer.Unpooled;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.protocol.Commands;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/impl/MessageIdTest.class */
public class MessageIdTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MessageIdTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    protected void cleanup() throws Exception {
        internalCleanup();
    }

    @Test(timeOut = 10000)
    public void producerSendAsync() throws PulsarClientException {
        String str = "persistent://prop/cluster/namespace/topic-producerSendAsync";
        String str2 = "my-message-producerSendAsync-";
        Producer<byte[]> create = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic(str).subscriptionName("my-subscription-producerSendAsync").subscribe();
        HashSet hashSet = new HashSet();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 30; i++) {
            arrayList.add(create.sendAsync((str2 + i).getBytes()));
        }
        MessageId messageId = null;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                MessageId messageId2 = (MessageIdImpl) ((Future) it.next()).get();
                if (messageId != null) {
                    Assert.assertTrue(messageId2.compareTo(messageId) > 0, "Message Ids should be in ascending order");
                }
                hashSet.add(messageId2);
                messageId = messageId2;
            } catch (Exception e) {
                Assert.fail("Failed to publish message, Exception: " + e.getMessage());
            }
        }
        log.info("Message IDs = " + hashSet);
        Assert.assertEquals(hashSet.size(), 30, "Not all messages published successfully");
        for (int i2 = 0; i2 < 30; i2++) {
            Message<byte[]> receive = subscribe.receive();
            Assert.assertEquals(new String(receive.getData()), str2 + i2);
            Assert.assertTrue(hashSet.remove(receive.getMessageId()), "Failed to receive message");
        }
        log.info("Message IDs = " + hashSet);
        Assert.assertEquals(hashSet.size(), 0, "Not all messages received successfully");
        subscribe.unsubscribe();
    }

    @Test(timeOut = 10000)
    public void producerSend() throws PulsarClientException {
        String str = "persistent://prop/cluster/namespace/topic-producerSend";
        String str2 = "my-message-producerSend-";
        Producer<byte[]> create = this.pulsarClient.newProducer().topic(str).create();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic(str).subscriptionName("my-subscription-producerSend").subscribe();
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 30; i++) {
            hashSet.add(create.send((str2 + i).getBytes()));
        }
        log.info("Message IDs = " + hashSet);
        Assert.assertEquals(hashSet.size(), 30, "Not all messages published successfully");
        for (int i2 = 0; i2 < 30; i2++) {
            Assert.assertTrue(hashSet.remove(subscribe.receive().getMessageId()), "Failed to receive Message");
        }
        log.info("Message IDs = " + hashSet);
        Assert.assertEquals(hashSet.size(), 0, "Not all messages received successfully");
        subscribe.unsubscribe();
    }

    @Test(timeOut = 10000)
    public void partitionedProducerSendAsync() throws PulsarClientException, PulsarAdminException {
        String str = "persistent://prop/cluster/namespace/topic-partitionedProducerSendAsync";
        String str2 = "my-message-partitionedProducerSendAsync-";
        this.admin.topics().createPartitionedTopic(str, 3);
        Producer<byte[]> create = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic(str).subscriptionName("my-subscription-partitionedProducerSendAsync").subscribe();
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (int i = 0; i < 30; i++) {
            hashSet2.add(create.sendAsync((str2 + i).getBytes()));
        }
        hashSet2.forEach(future -> {
            try {
                hashSet.add(future.get());
            } catch (Exception e) {
                Assert.fail("Failed to publish message, Exception: " + e.getMessage());
            }
        });
        log.info("Message IDs = " + hashSet);
        Assert.assertEquals(hashSet.size(), 30, "Not all messages published successfully");
        for (int i2 = 0; i2 < 30; i2++) {
            MessageId innerMessageId = subscribe.receive().getMessageId().getInnerMessageId();
            log.info("Message ID Received = " + innerMessageId);
            Assert.assertTrue(hashSet.remove(innerMessageId), "Failed to receive Message");
        }
        log.info("Message IDs = " + hashSet);
        Assert.assertEquals(hashSet.size(), 0, "Not all messages received successfully");
        subscribe.unsubscribe();
    }

    @Test(timeOut = 10000)
    public void partitionedProducerSend() throws PulsarClientException, PulsarAdminException {
        String str = "persistent://prop/cluster/namespace/topic-partitionedProducerSend";
        String str2 = "my-message-partitionedProducerSend-";
        this.admin.topics().createPartitionedTopic(str, 7);
        Producer<byte[]> create = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic(str).subscriptionName("my-subscription-partitionedProducerSend").subscribe();
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 30; i++) {
            hashSet.add(create.send((str2 + i).getBytes()));
        }
        log.info("Message IDs = " + hashSet);
        Assert.assertEquals(hashSet.size(), 30, "Not all messages published successfully");
        for (int i2 = 0; i2 < 30; i2++) {
            Assert.assertTrue(hashSet.remove(subscribe.receive().getMessageId().getInnerMessageId()), "Failed to receive Message");
        }
        log.info("Message IDs = " + hashSet);
        Assert.assertEquals(hashSet.size(), 0, "Not all messages received successfully");
    }

    @Test
    public void testChecksumVersionComptability() throws Exception {
        ProducerImpl create = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        ProducerImpl producerImpl = (ProducerImpl) Mockito.spy(create);
        ((ProducerImpl) Mockito.doReturn(Integer.valueOf(producerImpl.brokerChecksumSupportedVersion() + 1)).when(producerImpl)).brokerChecksumSupportedVersion();
        ((ProducerImpl) Mockito.doAnswer(invocationOnMock -> {
            return create.getState();
        }).when(producerImpl)).getState();
        ((ProducerImpl) Mockito.doAnswer(invocationOnMock2 -> {
            return create.getClientCnx();
        }).when(producerImpl)).getClientCnx();
        ((ProducerImpl) Mockito.doAnswer(invocationOnMock3 -> {
            return create.cnx();
        }).when(producerImpl)).cnx();
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic("persistent://prop/use/ns-abc/topic1").subscriptionName("my-sub").subscribe();
        stopBroker();
        this.pulsarClient.timer().stop();
        ClientCnx clientCnx = (ClientCnx) Mockito.spy(new ClientCnx(new ClientConfigurationData(), this.pulsarClient.eventLoopGroup()));
        ((ClientCnx) Mockito.doReturn(Integer.valueOf(producerImpl.brokerChecksumSupportedVersion() - 1)).when(clientCnx)).getRemoteEndpointProtocolVersion();
        create.setClientCnx(clientCnx);
        CompletableFuture sendAsync = producerImpl.sendAsync("message-1".getBytes());
        byte[] bytes = "message-2".getBytes();
        TypedMessageBuilderImpl value = producerImpl.newMessage().value(bytes);
        CompletableFuture<MessageId> sendAsync2 = value.sendAsync();
        value.getContent().put(bytes.length - 1, (byte) 51);
        create.setClientCnx((ClientCnx) null);
        startBroker();
        create.grabCnx();
        try {
            sendAsync.get();
            sendAsync2.get();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Broker shouldn't verify checksum for corrupted message and it shouldn't fail");
        }
        subscribe.grabCnx();
        Assert.assertEquals(new String(subscribe.receive(1, TimeUnit.SECONDS).getData()), "message-1");
        Assert.assertEquals(new String(subscribe.receive(1, TimeUnit.SECONDS).getData()), "message-3");
    }

    @Test
    public void testChecksumReconnection() throws Exception {
        ProducerImpl create = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        ProducerImpl producerImpl = (ProducerImpl) Mockito.spy(create);
        ((ProducerImpl) Mockito.doReturn(Integer.valueOf(producerImpl.brokerChecksumSupportedVersion() + 1)).when(producerImpl)).brokerChecksumSupportedVersion();
        ((ProducerImpl) Mockito.doAnswer(invocationOnMock -> {
            return create.getState();
        }).when(producerImpl)).getState();
        ((ProducerImpl) Mockito.doAnswer(invocationOnMock2 -> {
            return create.getClientCnx();
        }).when(producerImpl)).getClientCnx();
        ((ProducerImpl) Mockito.doAnswer(invocationOnMock3 -> {
            return create.cnx();
        }).when(producerImpl)).cnx();
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic("persistent://prop/use/ns-abc/topic1").subscriptionName("my-sub").subscribe();
        stopBroker();
        this.pulsarClient.timer().stop();
        ClientCnx clientCnx = (ClientCnx) Mockito.spy(new ClientCnx(new ClientConfigurationData(), this.pulsarClient.eventLoopGroup()));
        ((ClientCnx) Mockito.doReturn(Integer.valueOf(producerImpl.brokerChecksumSupportedVersion() - 1)).when(clientCnx)).getRemoteEndpointProtocolVersion();
        create.setClientCnx(clientCnx);
        CompletableFuture sendAsync = producerImpl.sendAsync("message-1".getBytes());
        byte[] bytes = "message-2".getBytes();
        TypedMessageBuilderImpl value = producerImpl.newMessage().value(bytes);
        CompletableFuture<MessageId> sendAsync2 = value.sendAsync();
        value.getContent().put(bytes.length - 1, (byte) 51);
        create.setClientCnx((ClientCnx) null);
        startBroker();
        create.grabCnx();
        try {
            sendAsync.get(10L, TimeUnit.SECONDS);
            sendAsync2.get(10L, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Broker shouldn't verify checksum for corrupted message and it shouldn't fail");
        }
        subscribe.grabCnx();
        Assert.assertEquals(new String(subscribe.receive(1, TimeUnit.SECONDS).getData()), "message-1");
        Assert.assertEquals(new String(subscribe.receive(1, TimeUnit.SECONDS).getData()), "message-3");
    }

    @Test
    public void testCorruptMessageRemove() throws Exception {
        ProducerImpl producerImpl = (ProducerImpl) Mockito.spy(this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/retry-topic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).sendTimeout(10, TimeUnit.MINUTES).create());
        Field declaredField = ProducerImpl.class.getDeclaredField("producerId");
        declaredField.setAccessible(true);
        long longValue = ((Long) declaredField.get(producerImpl)).longValue();
        producerImpl.cnx().registerProducer(longValue, producerImpl);
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://prop/use/ns-abc/retry-topic").subscriptionName("my-sub").subscribe();
        stopBroker();
        byte[] bytes = "message-1".getBytes();
        TypedMessageBuilderImpl value = producerImpl.newMessage().value(bytes);
        CompletableFuture<MessageId> sendAsync = value.sendAsync();
        value.getContent().put(bytes.length - 1, (byte) 50);
        startBroker();
        try {
            sendAsync.get();
            Assert.fail("send message should have failed with checksum excetion");
        } catch (Exception e) {
            if (!(e.getCause() instanceof PulsarClientException.ChecksumException)) {
                Assert.fail("Callback should have only failed with ChecksumException", e);
            }
        }
        byte[] bytes2 = "message-2".getBytes();
        TypedMessageBuilderImpl value2 = producerImpl.newMessage().value("message-1".getBytes());
        ProducerImpl.OpSendMsg create = ProducerImpl.OpSendMsg.create(value2.getMessage(), Commands.newSend(longValue, 1L, 1, Commands.ChecksumType.Crc32c, value.getMetadataBuilder().setProducerName("test").setSequenceId(1L).setPublishTime(10L).build(), Unpooled.wrappedBuffer(value2.getContent())), 1L, (SendCallback) null);
        Assert.assertTrue(producerImpl.verifyLocalBufferIsNotCorrupted(create));
        value2.getContent().put(bytes2.length - 1, (byte) 50);
        Assert.assertFalse(producerImpl.verifyLocalBufferIsNotCorrupted(create));
        Assert.assertEquals(producerImpl.getPendingQueueSize(), 0);
        stopBroker();
        producerImpl.newMessage().value("message-1".getBytes()).sendAsync();
        ClientCnx clientCnx = (ClientCnx) Mockito.spy(new ClientCnx(new ClientConfigurationData(), this.pulsarClient.eventLoopGroup()));
        ((ClientCnx) Mockito.doThrow(new IllegalStateException("broker is already stopped")).when(clientCnx)).ctx();
        try {
            producerImpl.recoverChecksumError(clientCnx, 1L);
            Assert.fail("it should call : resendMessages() => which should throw above mocked exception");
        } catch (IllegalStateException e2) {
            Assert.assertEquals("broker is already stopped", e2.getMessage());
        }
        producerImpl.close();
        subscribe.close();
    }
}
