package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/api/ConsumerRedeliveryTest.class */
public class ConsumerRedeliveryTest extends ProducerConsumerBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setManagedLedgerCacheEvictionFrequency(0.1d);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testOrderedRedelivery() throws Exception {
        this.conf.setManagedLedgerMaxEntriesPerLedger(2);
        this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/redelivery").producerName("my-producer-name").create();
        ConsumerBuilder<byte[]> subscriptionType = this.pulsarClient.newConsumer().topic("persistent://my-property/my-ns/redelivery").subscriptionName("s1").subscriptionType(SubscriptionType.Shared);
        ConsumerImpl subscribe = subscriptionType.subscribe();
        for (int i = 0; i < 100; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        int i2 = 0;
        HashSet newHashSet = Sets.newHashSet();
        for (int i3 = 0; i3 < 100; i3++) {
            Message receive = subscribe.receive(5, TimeUnit.SECONDS);
            if (receive == null || i2 % 2 != 0) {
                newHashSet.add(receive.getMessageId());
            } else {
                subscribe.acknowledge(receive);
            }
            i2++;
        }
        Assert.assertEquals(100, i2);
        subscribe.redeliverUnacknowledgedMessages(newHashSet);
        MessageIdImpl messageIdImpl = null;
        for (int i4 = 0; i4 < 50; i4++) {
            MessageIdImpl messageId = subscribe.receive(5, TimeUnit.SECONDS).getMessageId();
            if (messageIdImpl != null) {
                Assert.assertTrue(messageIdImpl.getLedgerId() <= messageId.getLedgerId(), "lastMsgId: " + messageIdImpl + " -- msgId: " + messageId);
            }
            messageIdImpl = messageId;
        }
        subscribe.close();
        Consumer<byte[]> subscribe2 = subscriptionType.subscribe();
        MessageIdImpl messageIdImpl2 = null;
        for (int i5 = 0; i5 < 50; i5++) {
            MessageIdImpl messageId2 = subscribe2.receive(5, TimeUnit.SECONDS).getMessageId();
            if (messageIdImpl2 != null) {
                Assert.assertTrue(messageIdImpl2.getLedgerId() <= messageId2.getLedgerId());
            }
            messageIdImpl2 = messageId2;
        }
        create.close();
        subscribe2.close();
    }
}
