package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test
/* loaded from: input_file:org/apache/pulsar/broker/service/TopicTerminationTest.class */
public class TopicTerminationTest extends BrokerTestBase {
    private final String topicName = "persistent://prop/use/ns-abc/topic0";

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSimpleTermination() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic0");
        createProducer.send("test-msg-1".getBytes());
        createProducer.send("test-msg-2".getBytes());
        Assert.assertEquals((MessageId) this.admin.persistentTopics().terminateTopicAsync("persistent://prop/use/ns-abc/topic0").get(), createProducer.send("test-msg-3".getBytes()));
        try {
            createProducer.send("test-msg-4".getBytes());
            Assert.fail("Should have thrown exception");
        } catch (PulsarClientException.TopicTerminatedException e) {
        }
    }

    @Test
    public void testCreateProducerOnTerminatedTopic() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic0");
        createProducer.send("test-msg-1".getBytes());
        createProducer.send("test-msg-2".getBytes());
        Assert.assertEquals((MessageId) this.admin.persistentTopics().terminateTopicAsync("persistent://prop/use/ns-abc/topic0").get(), createProducer.send("test-msg-3".getBytes()));
        try {
            this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic0");
            Assert.fail("Should have thrown exception");
        } catch (PulsarClientException.TopicTerminatedException e) {
        }
    }

    @Test(timeOut = 20000)
    public void testTerminateWhilePublishing() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic0");
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        ArrayList arrayList = new ArrayList();
        Thread thread = new Thread(() -> {
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
            }
            for (int i = 0; i < 1000; i++) {
                arrayList.add(createProducer.sendAsync("test".getBytes()));
            }
        });
        thread.start();
        cyclicBarrier.await();
        this.admin.persistentTopics().terminateTopicAsync("persistent://prop/use/ns-abc/topic0").get();
        thread.join();
        boolean z = false;
        try {
            FutureUtil.waitForAll(arrayList).get();
        } catch (Exception e) {
        }
        for (int i = 0; i < 1000; i++) {
            Assert.assertTrue(((CompletableFuture) arrayList.get(i)).isDone());
            if (z) {
                Assert.assertTrue(((CompletableFuture) arrayList.get(i)).isCompletedExceptionally());
            }
            z = ((CompletableFuture) arrayList.get(i)).isCompletedExceptionally();
        }
    }

    @Test
    public void testDoubleTerminate() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic0");
        createProducer.send("test-msg-1".getBytes());
        createProducer.send("test-msg-2".getBytes());
        MessageId send = createProducer.send("test-msg-3".getBytes());
        Assert.assertEquals((MessageId) this.admin.persistentTopics().terminateTopicAsync("persistent://prop/use/ns-abc/topic0").get(), send);
        Assert.assertEquals((MessageId) this.admin.persistentTopics().terminateTopicAsync("persistent://prop/use/ns-abc/topic0").get(), send);
    }

    @Test
    public void testTerminatePartitionedTopic() throws Exception {
        this.admin.persistentTopics().createPartitionedTopic("persistent://prop/use/ns-abc/topic0", 4);
        try {
            this.admin.persistentTopics().terminateTopicAsync("persistent://prop/use/ns-abc/topic0").get();
            Assert.fail("Should have failed");
        } catch (ExecutionException e) {
            Assert.assertEquals(e.getCause().getClass(), PulsarAdminException.NotAllowedException.class);
        }
    }

    @Test(timeOut = 20000)
    public void testSimpleTerminationConsumer() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic0");
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic0", "my-sub");
        MessageId send = createProducer.send("test-msg-1".getBytes());
        MessageId send2 = createProducer.send("test-msg-2".getBytes());
        Message receive = subscribe.receive();
        Assert.assertEquals(receive.getMessageId(), send);
        subscribe.acknowledge(receive);
        MessageId send3 = createProducer.send("test-msg-3".getBytes());
        Assert.assertFalse(subscribe.hasReachedEndOfTopic());
        Assert.assertEquals((MessageId) this.admin.persistentTopics().terminateTopicAsync("persistent://prop/use/ns-abc/topic0").get(), send3);
        Message receive2 = subscribe.receive();
        Assert.assertEquals(receive2.getMessageId(), send2);
        subscribe.acknowledge(receive2);
        Message receive3 = subscribe.receive();
        Assert.assertEquals(receive3.getMessageId(), send3);
        subscribe.acknowledge(receive3);
        Assert.assertNull(subscribe.receive(100, TimeUnit.MILLISECONDS));
        Thread.sleep(100L);
        Assert.assertTrue(subscribe.hasReachedEndOfTopic());
    }

    @Test(timeOut = 20000)
    public void testSimpleTerminationMessageListener() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic0");
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setMessageListener(new MessageListener() { // from class: org.apache.pulsar.broker.service.TopicTerminationTest.1
            public void received(Consumer consumer, Message message) {
            }

            public void reachedEndOfTopic(Consumer consumer) {
                countDownLatch.countDown();
                Assert.assertTrue(consumer.hasReachedEndOfTopic());
            }
        });
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic0", "my-sub", consumerConfiguration);
        createProducer.send("test-msg-1".getBytes());
        createProducer.send("test-msg-2".getBytes());
        MessageId send = createProducer.send("test-msg-3".getBytes());
        subscribe.acknowledgeCumulative(send);
        Thread.sleep(100L);
        Assert.assertFalse(subscribe.hasReachedEndOfTopic());
        Assert.assertEquals((MessageId) this.admin.persistentTopics().terminateTopicAsync("persistent://prop/use/ns-abc/topic0").get(), send);
        Assert.assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
        Assert.assertTrue(subscribe.hasReachedEndOfTopic());
    }

    @Test(timeOut = 20000)
    public void testSimpleTerminationReader() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic0");
        MessageId send = createProducer.send("test-msg-1".getBytes());
        MessageId send2 = createProducer.send("test-msg-2".getBytes());
        MessageId send3 = createProducer.send("test-msg-3".getBytes());
        Assert.assertEquals((MessageId) this.admin.persistentTopics().terminateTopicAsync("persistent://prop/use/ns-abc/topic0").get(), send3);
        Reader createReader = this.pulsarClient.createReader("persistent://prop/use/ns-abc/topic0", MessageId.earliest, new ReaderConfiguration());
        Assert.assertEquals(createReader.readNext().getMessageId(), send);
        Assert.assertEquals(createReader.readNext().getMessageId(), send2);
        Assert.assertEquals(createReader.readNext().getMessageId(), send3);
        Assert.assertNull(createReader.readNext(100, TimeUnit.MILLISECONDS));
        Thread.sleep(100L);
        Assert.assertTrue(createReader.hasReachedEndOfTopic());
    }

    @Test(timeOut = 20000)
    public void testSimpleTerminationReaderListener() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic0");
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ReaderConfiguration readerConfiguration = new ReaderConfiguration();
        readerConfiguration.setReaderListener(new ReaderListener() { // from class: org.apache.pulsar.broker.service.TopicTerminationTest.2
            public void received(Reader reader, Message message) {
            }

            public void reachedEndOfTopic(Reader reader) {
                countDownLatch.countDown();
                Assert.assertTrue(reader.hasReachedEndOfTopic());
            }
        });
        Reader createReader = this.pulsarClient.createReader("persistent://prop/use/ns-abc/topic0", MessageId.latest, readerConfiguration);
        createProducer.send("test-msg-1".getBytes());
        createProducer.send("test-msg-2".getBytes());
        MessageId send = createProducer.send("test-msg-3".getBytes());
        Thread.sleep(100L);
        Assert.assertFalse(createReader.hasReachedEndOfTopic());
        Assert.assertEquals((MessageId) this.admin.persistentTopics().terminateTopicAsync("persistent://prop/use/ns-abc/topic0").get(), send);
        Assert.assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
        Assert.assertTrue(createReader.hasReachedEndOfTopic());
    }

    @Test(timeOut = 20000)
    public void testSubscribeOnTerminatedTopic() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic0");
        createProducer.send("test-msg-1".getBytes());
        Assert.assertEquals((MessageId) this.admin.persistentTopics().terminateTopicAsync("persistent://prop/use/ns-abc/topic0").get(), createProducer.send("test-msg-2".getBytes()));
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic0", "my-sub");
        Thread.sleep(200L);
        Assert.assertTrue(subscribe.hasReachedEndOfTopic());
    }

    @Test(timeOut = 20000)
    public void testSubscribeOnTerminatedTopicWithNoMessages() throws Exception {
        this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic0");
        this.admin.persistentTopics().terminateTopicAsync("persistent://prop/use/ns-abc/topic0").get();
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic0", "my-sub");
        Thread.sleep(200L);
        Assert.assertTrue(subscribe.hasReachedEndOfTopic());
    }
}
