package org.apache.pulsar.client.api;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.LinkedList;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/api/ClientDeduplicationFailureTest.class */
public class ClientDeduplicationFailureTest {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ClientDeduplicationFailureTest.class);
    LocalBookkeeperEnsemble bkEnsemble;
    ServiceConfiguration config;
    URL url;
    PulsarService pulsar;
    PulsarAdmin admin;
    PulsarClient pulsarClient;
    BrokerStats brokerStatsClient;
    final String tenant = "external-repl-prop";
    String primaryHost;

    /* loaded from: input_file:org/apache/pulsar/client/api/ClientDeduplicationFailureTest$ProducerThread.class */
    private static class ProducerThread implements Runnable {
        private Producer<String> producer;
        private CompletableFuture<MessageId> lastMessageFuture;
        private volatile boolean isRunning = false;
        private long i = 1;
        private AtomicLong atomicLong = new AtomicLong(0);
        private Thread thread = new Thread(this);

        public ProducerThread(Producer<String> producer) {
            this.producer = producer;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.isRunning) {
                this.lastMessageFuture = this.producer.newMessage().sequenceId(this.i).value("foo-" + this.i).sendAsync();
                this.lastMessageFuture.thenAccept(messageId -> {
                    this.atomicLong.incrementAndGet();
                }).exceptionally(th -> {
                    ClientDeduplicationFailureTest.log.info("publish exception:", th);
                    return null;
                });
                this.i++;
            }
            ClientDeduplicationFailureTest.log.info("done Producing! Last send: {}", Long.valueOf(this.i));
        }

        public void start() {
            this.isRunning = true;
            this.thread.start();
        }

        public void stop() {
            this.isRunning = false;
            try {
                ClientDeduplicationFailureTest.log.info("Waiting for last message to complete");
                try {
                    this.lastMessageFuture.get(60L, TimeUnit.SECONDS);
                    ClientDeduplicationFailureTest.log.info("Producer Thread stopped!");
                } catch (TimeoutException e) {
                    throw new RuntimeException("Last message hasn't completed within timeout!");
                }
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }

        public long getLastSeqId() {
            return this.atomicLong.get();
        }
    }

    @BeforeMethod(timeOut = 300000)
    void setup(Method method) throws Exception {
        log.info("--- Setting up method {} ---", method.getName());
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> {
            return 0;
        });
        this.bkEnsemble.start();
        this.config = (ServiceConfiguration) Mockito.spy(new ServiceConfiguration());
        this.config.setClusterName("use");
        this.config.setWebServicePort(Optional.of(0));
        this.config.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        this.config.setBrokerServicePort(Optional.of(0));
        this.config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
        this.config.setTlsAllowInsecureConnection(true);
        this.config.setAdvertisedAddress("localhost");
        this.config.setLoadBalancerSheddingEnabled(false);
        this.config.setLoadBalancerAutoBundleSplitEnabled(false);
        this.config.setLoadBalancerEnabled(false);
        this.config.setLoadBalancerAutoUnloadSplitBundlesEnabled(false);
        this.config.setAllowAutoTopicCreationType("non-partitioned");
        this.pulsar = new PulsarService(this.config);
        this.pulsar.start();
        String webServiceAddress = this.pulsar.getWebServiceAddress();
        this.url = new URL(webServiceAddress);
        this.admin = PulsarAdmin.builder().serviceHttpUrl(webServiceAddress).build();
        this.brokerStatsClient = this.admin.brokerStats();
        this.primaryHost = this.pulsar.getWebServiceAddress();
        this.admin.clusters().createCluster(this.config.getClusterName(), new ClusterData(this.url.toString()));
        this.pulsarClient = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).maxBackoffInterval(1L, TimeUnit.SECONDS).build();
        TenantInfo tenantInfo = new TenantInfo();
        tenantInfo.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
        this.admin.tenants().createTenant("external-repl-prop", tenantInfo);
    }

    @AfterMethod
    void shutdown() throws Exception {
        log.info("--- Shutting down ---");
        this.pulsarClient.close();
        this.admin.close();
        this.pulsar.close();
        this.bkEnsemble.stop();
    }

    @Test(timeOut = 300000)
    public void testClientDeduplicationCorrectnessWithFailure() throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/dedup");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/dedup", Sets.newHashSet(Lists.newArrayList("use")));
        this.admin.namespaces().setDeduplicationStatus("external-repl-prop/dedup", true);
        this.admin.namespaces().setRetention("external-repl-prop/dedup", new RetentionPolicies(-1, -1));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).blockIfQueueFull(true).sendTimeout(0, TimeUnit.SECONDS).topic("persistent://external-repl-prop/dedup/my-topic1").producerName("test-producer-1").create();
        ProducerThread producerThread = new ProducerThread(create);
        producerThread.start();
        MockedPulsarServiceBaseTest.retryStrategically(r6 -> {
            try {
                TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/dedup/my-topic1");
                if (stats.publishers.size() == 1 && ((PublisherStats) stats.publishers.get(0)).getProducerName().equals("test-producer-1")) {
                    if (stats.storageSize > 0) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/dedup/my-topic1");
        Assert.assertEquals(stats.publishers.size(), 1);
        Assert.assertEquals(((PublisherStats) stats.publishers.get(0)).getProducerName(), "test-producer-1");
        Assert.assertTrue(stats.storageSize > 0);
        for (int i = 0; i < 5; i++) {
            log.info("Stopping BK...");
            this.bkEnsemble.stopBK();
            Thread.sleep(1000 + new Random().nextInt(500));
            log.info("Starting BK...");
            this.bkEnsemble.startBK();
        }
        producerThread.stop();
        create.newMessage().sequenceId(producerThread.getLastSeqId() + 1).value("end").send();
        create.close();
        Reader create2 = this.pulsarClient.newReader(Schema.STRING).startMessageId(MessageId.earliest).topic("persistent://external-repl-prop/dedup/my-topic1").create();
        Message message = null;
        int i2 = 0;
        while (true) {
            Message readNext = create2.readNext(5, TimeUnit.SECONDS);
            if (readNext == null) {
                break;
            }
            if (((String) readNext.getValue()).equals("end")) {
                log.info("Last seq Id received: {}", Long.valueOf(message.getSequenceId()));
                break;
            }
            if (message == null) {
                Assert.assertEquals(readNext.getSequenceId(), 1L);
            } else {
                Assert.assertEquals(readNext.getSequenceId(), message.getSequenceId() + 1);
            }
            message = readNext;
            i2++;
        }
        log.info("# of messages read: {}", Integer.valueOf(i2));
        Assert.assertTrue(message != null);
        Assert.assertEquals(message.getSequenceId(), producerThread.getLastSeqId());
    }

    @Test(timeOut = 300000)
    public void testClientDeduplicationWithBkFailure() throws Exception {
        LinkedList linkedList = new LinkedList();
        this.admin.namespaces().createNamespace("external-repl-prop/dedup");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/dedup", Sets.newHashSet(Lists.newArrayList("use")));
        this.admin.namespaces().setDeduplicationStatus("external-repl-prop/dedup", true);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/dedup/my-topic1").producerName("test-producer-1").create();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic("persistent://external-repl-prop/dedup/my-topic1").consumerName("test-consumer-1").subscriptionName("sub1").subscribe();
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic("persistent://external-repl-prop/dedup/my-topic1").consumerName("test-consumer-2").subscriptionName("sub2").subscribe();
        Thread thread = new Thread(() -> {
            while (true) {
                try {
                    Message<?> receive = subscribe2.receive();
                    linkedList.add(receive);
                    subscribe2.acknowledge(receive);
                } catch (PulsarClientException e) {
                    log.error("Failed to consume message: {}", e, e);
                    return;
                }
            }
        });
        thread.start();
        MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
            boolean z;
            try {
                TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/dedup/my-topic1");
                boolean z2 = stats != null && stats.subscriptions.get("sub1") != null && ((SubscriptionStats) stats.subscriptions.get("sub1")).consumers.size() == 1 && ((org.apache.pulsar.common.policies.data.ConsumerStats) ((SubscriptionStats) stats.subscriptions.get("sub1")).consumers.get(0)).consumerName.equals("test-consumer-1");
                if (stats != null && stats.subscriptions.get("sub2") != null && ((SubscriptionStats) stats.subscriptions.get("sub2")).consumers.size() == 1) {
                    if (((org.apache.pulsar.common.policies.data.ConsumerStats) ((SubscriptionStats) stats.subscriptions.get("sub2")).consumers.get(0)).consumerName.equals("test-consumer-2")) {
                        z = true;
                        return !z2 && z;
                    }
                }
                z = false;
                if (z2) {
                }
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/dedup/my-topic1");
        Assert.assertTrue(stats != null);
        Assert.assertTrue(stats.subscriptions.get("sub1") != null);
        Assert.assertEquals(((SubscriptionStats) stats.subscriptions.get("sub1")).consumers.size(), 1);
        Assert.assertEquals(((org.apache.pulsar.common.policies.data.ConsumerStats) ((SubscriptionStats) stats.subscriptions.get("sub1")).consumers.get(0)).consumerName, "test-consumer-1");
        TopicStats stats2 = this.admin.topics().getStats("persistent://external-repl-prop/dedup/my-topic1");
        Assert.assertTrue(stats2 != null);
        Assert.assertTrue(stats2.subscriptions.get("sub2") != null);
        Assert.assertEquals(((SubscriptionStats) stats2.subscriptions.get("sub2")).consumers.size(), 1);
        Assert.assertEquals(((org.apache.pulsar.common.policies.data.ConsumerStats) ((SubscriptionStats) stats2.subscriptions.get("sub2")).consumers.get(0)).consumerName, "test-consumer-2");
        for (int i = 0; i < 10; i++) {
            create.newMessage().sequenceId(i).value("foo-" + i).send();
        }
        for (int i2 = 0; i2 < 10; i2++) {
            Message<?> receive = subscribe.receive();
            subscribe.acknowledge(receive);
            Assert.assertEquals((String) receive.getValue(), "foo-" + i2);
            Assert.assertEquals(receive.getSequenceId(), i2);
        }
        log.info("Stopping BK...");
        this.bkEnsemble.stopBK();
        LinkedList linkedList2 = new LinkedList();
        for (int i3 = 10; i3 < 20; i3++) {
            CompletableFuture<MessageId> sendAsync = create.newMessage().sequenceId(i3).value("foo-" + i3).sendAsync();
            int i4 = i3;
            sendAsync.thenRun(() -> {
                log.error("message: {} successful", Integer.valueOf(i4));
            }).exceptionally((Function<Throwable, ? extends Void>) th -> {
                log.info("message: {} failed: {}", Integer.valueOf(i4), th, th);
                return null;
            });
            linkedList2.add(sendAsync);
        }
        for (int i5 = 0; i5 < linkedList2.size(); i5++) {
            try {
                ((CompletableFuture) linkedList2.get(i5)).join();
                Assert.fail();
            } catch (CompletionException e) {
            } catch (Exception e2) {
                Assert.fail();
            }
        }
        try {
            create.newMessage().sequenceId(10L).value("foo-10").send();
            Assert.fail();
        } catch (PulsarClientException e3) {
        }
        try {
            create.newMessage().sequenceId(10L).value("foo-10").send();
            Assert.fail();
        } catch (PulsarClientException e4) {
        }
        log.info("Starting BK...");
        this.bkEnsemble.startBK();
        for (int i6 = 20; i6 < 30; i6++) {
            create.newMessage().sequenceId(i6).value("foo-" + i6).send();
        }
        MessageId messageId = null;
        for (int i7 = 20; i7 < 30; i7++) {
            Message<?> receive2 = subscribe.receive();
            messageId = receive2.getMessageId();
            subscribe.acknowledge(receive2);
            Assert.assertEquals((String) receive2.getValue(), "foo-" + i7);
            Assert.assertEquals(receive2.getSequenceId(), i7);
        }
        MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
            return linkedList.size() >= 20;
        }, 5, 200L);
        Assert.assertEquals(linkedList.size(), 20);
        for (int i8 = 0; i8 < 10; i8++) {
            Assert.assertEquals((String) ((Message) linkedList.get(i8)).getValue(), "foo-" + i8);
            Assert.assertEquals(((Message) linkedList.get(i8)).getSequenceId(), i8);
        }
        for (int i9 = 10; i9 < 20; i9++) {
            Assert.assertEquals((String) ((Message) linkedList.get(i9)).getValue(), "foo-" + (i9 + 10));
            Assert.assertEquals(((Message) linkedList.get(i9)).getSequenceId(), i9 + 10);
        }
        BatchMessageIdImpl batchMessageIdImpl = (BatchMessageIdImpl) messageId;
        MessageIdImpl lastMessageId = subscribe.getLastMessageId();
        Assert.assertEquals(lastMessageId.getLedgerId(), batchMessageIdImpl.getLedgerId());
        Assert.assertEquals(lastMessageId.getEntryId(), batchMessageIdImpl.getEntryId());
        thread.interrupt();
    }
}
