package org.apache.pulsar.compaction;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/compaction/CompactionTest.class */
public class CompactionTest extends MockedPulsarServiceBaseTest {
    private ScheduledExecutorService compactionScheduler;
    private BookKeeper bk;

    /* loaded from: input_file:org/apache/pulsar/compaction/CompactionTest$EncKeyReader.class */
    class EncKeyReader implements CryptoKeyReader {
        EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

        EncKeyReader() {
        }

        @Override // org.apache.pulsar.client.api.CryptoKeyReader
        public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
            String str2 = "./src/test/resources/certificate/public-key." + str;
            if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                return null;
            }
            try {
                this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                return this.keyInfo;
            } catch (IOException e) {
                Assert.fail("Failed to read certificate from " + str2);
                return null;
            }
        }

        @Override // org.apache.pulsar.client.api.CryptoKeyReader
        public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
            String str2 = "./src/test/resources/certificate/private-key." + str;
            if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                return null;
            }
            try {
                this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                return this.keyInfo;
            } catch (IOException e) {
                Assert.fail("Failed to read certificate from " + str2);
                return null;
            }
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("use", new ClusterData(this.pulsar.getWebServiceAddress()));
        this.admin.tenants().createTenant("my-property", new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
        this.admin.namespaces().createNamespace("my-property/use/my-ns");
        this.compactionScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
        this.bk = this.pulsar.getBookKeeperClientFactory().create(this.conf, (ZooKeeper) null, Optional.empty(), (Map) null);
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    public void cleanup() throws Exception {
        super.internalCleanup();
        this.compactionScheduler.shutdownNow();
    }

    @Test
    public void testCompaction() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        Random random = new Random(0L);
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        for (int i = 0; i < 20; i++) {
            String str = TypedMessageBuilder.CONF_KEY + random.nextInt(10);
            byte[] bytes = ("my-message-" + str + "-" + i).getBytes();
            create.newMessage().key(str).value(bytes).send();
            hashMap.put(str, bytes);
            arrayList.add(Pair.of(str, bytes));
        }
        new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
        Throwable th = null;
        do {
            try {
                try {
                    Message<byte[]> receive = subscribe.receive(2, TimeUnit.SECONDS);
                    Assert.assertEquals((byte[]) hashMap.remove(receive.getKey()), receive.getData());
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } finally {
            }
        } while (!hashMap.isEmpty());
        Assert.assertTrue(hashMap.isEmpty());
        if (subscribe != null) {
            if (0 != 0) {
                try {
                    subscribe.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            } else {
                subscribe.close();
            }
        }
        subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(false).subscribe();
        Throwable th4 = null;
        do {
            try {
                try {
                    Message<byte[]> receive2 = subscribe.receive(2, TimeUnit.SECONDS);
                    Pair pair = (Pair) arrayList.remove(0);
                    Assert.assertEquals((String) pair.getLeft(), receive2.getKey());
                    Assert.assertEquals((byte[]) pair.getRight(), receive2.getData());
                } catch (Throwable th5) {
                    th4 = th5;
                    throw th5;
                }
            } finally {
            }
        } while (!arrayList.isEmpty());
        Assert.assertTrue(arrayList.isEmpty());
        if (subscribe != null) {
            if (0 == 0) {
                subscribe.close();
                return;
            }
            try {
                subscribe.close();
            } catch (Throwable th6) {
                th4.addSuppressed(th6);
            }
        }
    }

    @Test
    public void testCompactionWithReader() throws Exception {
        this.admin.namespaces().setRetention("my-property/use/my-ns", new RetentionPolicies(-1, -1));
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        Random random = new Random(0L);
        for (int i = 0; i < 20; i++) {
            String str = TypedMessageBuilder.CONF_KEY + random.nextInt(10);
            String str2 = "my-message-" + str + "-" + i;
            create.newMessage().key(str).value(str2.getBytes()).send();
            hashMap.put(str, str2);
            arrayList.add(Pair.of(str, str2));
        }
        new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
        Reader<byte[]> create2 = this.pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1").readCompacted(true).startMessageId(MessageId.earliest).create();
        Throwable th = null;
        do {
            try {
                try {
                    Message<byte[]> readNext = create2.readNext(2, TimeUnit.SECONDS);
                    Assert.assertEquals((String) hashMap.remove(readNext.getKey()), new String(readNext.getData()));
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } finally {
            }
        } while (!hashMap.isEmpty());
        Assert.assertTrue(hashMap.isEmpty());
        if (create2 != null) {
            if (0 != 0) {
                try {
                    create2.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            } else {
                create2.close();
            }
        }
        create2 = this.pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1").readCompacted(false).startMessageId(MessageId.earliest).create();
        Throwable th4 = null;
        do {
            try {
                try {
                    Message<byte[]> readNext2 = create2.readNext(2, TimeUnit.SECONDS);
                    Pair pair = (Pair) arrayList.remove(0);
                    Assert.assertEquals((String) pair.getLeft(), readNext2.getKey());
                    Assert.assertEquals((String) pair.getRight(), new String(readNext2.getData()));
                } catch (Throwable th5) {
                    th4 = th5;
                    throw th5;
                }
            } finally {
            }
        } while (!arrayList.isEmpty());
        Assert.assertTrue(arrayList.isEmpty());
        if (create2 != null) {
            if (0 == 0) {
                create2.close();
                return;
            }
            try {
                create2.close();
            } catch (Throwable th6) {
                th4.addSuppressed(th6);
            }
        }
    }

    @Test
    public void testReadCompactedBeforeCompaction() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).create();
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        create.newMessage().key("key0").value("content0".getBytes()).send();
        create.newMessage().key("key0").value("content1".getBytes()).send();
        create.newMessage().key("key0").value("content2".getBytes()).send();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
        Throwable th = null;
        try {
            try {
                Message<byte[]> receive = subscribe.receive();
                Assert.assertEquals(receive.getKey(), "key0");
                Assert.assertEquals(receive.getData(), "content0".getBytes());
                Message<byte[]> receive2 = subscribe.receive();
                Assert.assertEquals(receive2.getKey(), "key0");
                Assert.assertEquals(receive2.getData(), "content1".getBytes());
                Message<byte[]> receive3 = subscribe.receive();
                Assert.assertEquals(receive3.getKey(), "key0");
                Assert.assertEquals(receive3.getData(), "content2".getBytes());
                if (subscribe != null) {
                    if (0 != 0) {
                        try {
                            subscribe.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        subscribe.close();
                    }
                }
                new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
                subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
                Throwable th3 = null;
                try {
                    try {
                        Message<byte[]> receive4 = subscribe.receive();
                        Assert.assertEquals(receive4.getKey(), "key0");
                        Assert.assertEquals(receive4.getData(), "content2".getBytes());
                        if (subscribe != null) {
                            if (0 == 0) {
                                subscribe.close();
                                return;
                            }
                            try {
                                subscribe.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        }
                    } catch (Throwable th5) {
                        th3 = th5;
                        throw th5;
                    }
                } finally {
                }
            } catch (Throwable th6) {
                th = th6;
                throw th6;
            }
        } finally {
        }
    }

    @Test
    public void testReadEntriesAfterCompaction() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).create();
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        create.newMessage().key("key0").value("content0".getBytes()).send();
        create.newMessage().key("key0").value("content1".getBytes()).send();
        create.newMessage().key("key0").value("content2".getBytes()).send();
        new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
        create.newMessage().key("key0").value("content3".getBytes()).send();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
        Throwable th = null;
        try {
            try {
                Message<byte[]> receive = subscribe.receive();
                Assert.assertEquals(receive.getKey(), "key0");
                Assert.assertEquals(receive.getData(), "content2".getBytes());
                Message<byte[]> receive2 = subscribe.receive();
                Assert.assertEquals(receive2.getKey(), "key0");
                Assert.assertEquals(receive2.getData(), "content3".getBytes());
                if (subscribe != null) {
                    if (0 == 0) {
                        subscribe.close();
                        return;
                    }
                    try {
                        subscribe.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (subscribe != null) {
                if (th != null) {
                    try {
                        subscribe.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    subscribe.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSeekEarliestAfterCompaction() throws Exception {
        Throwable th;
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).create();
        create.newMessage().key("key0").value("content0".getBytes()).send();
        create.newMessage().key("key0").value("content1".getBytes()).send();
        create.newMessage().key("key0").value("content2".getBytes()).send();
        new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
        Throwable th2 = null;
        try {
            try {
                subscribe.seek(MessageId.earliest);
                Message<byte[]> receive = subscribe.receive();
                Assert.assertEquals(receive.getKey(), "key0");
                Assert.assertEquals(receive.getData(), "content2".getBytes());
                if (subscribe != null) {
                    if (0 != 0) {
                        try {
                            subscribe.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        subscribe.close();
                    }
                }
                subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(false).subscribe();
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    subscribe.seek(MessageId.earliest);
                    Message<byte[]> receive2 = subscribe.receive();
                    Assert.assertEquals(receive2.getKey(), "key0");
                    Assert.assertEquals(receive2.getData(), "content0".getBytes());
                    Message<byte[]> receive3 = subscribe.receive();
                    Assert.assertEquals(receive3.getKey(), "key0");
                    Assert.assertEquals(receive3.getData(), "content1".getBytes());
                    Message<byte[]> receive4 = subscribe.receive();
                    Assert.assertEquals(receive4.getKey(), "key0");
                    Assert.assertEquals(receive4.getData(), "content2".getBytes());
                    if (subscribe != null) {
                        if (0 == 0) {
                            subscribe.close();
                            return;
                        }
                        try {
                            subscribe.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testBrokerRestartAfterCompaction() throws Exception {
        Throwable th;
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).create();
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        create.newMessage().key("key0").value("content0".getBytes()).send();
        create.newMessage().key("key0").value("content1".getBytes()).send();
        create.newMessage().key("key0").value("content2".getBytes()).send();
        new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
        Throwable th2 = null;
        try {
            try {
                Message<byte[]> receive = subscribe.receive();
                Assert.assertEquals(receive.getKey(), "key0");
                Assert.assertEquals(receive.getData(), "content2".getBytes());
                if (subscribe != null) {
                    if (0 != 0) {
                        try {
                            subscribe.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        subscribe.close();
                    }
                }
                stopBroker();
                try {
                    Consumer<byte[]> subscribe2 = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
                    Throwable th4 = null;
                    try {
                        try {
                            subscribe2.receive();
                            Assert.fail("Shouldn't have been able to receive anything");
                            if (subscribe2 != null) {
                                if (0 != 0) {
                                    try {
                                        subscribe2.close();
                                    } catch (Throwable th5) {
                                        th4.addSuppressed(th5);
                                    }
                                } else {
                                    subscribe2.close();
                                }
                            }
                        } catch (Throwable th6) {
                            th4 = th6;
                            throw th6;
                        }
                    } finally {
                    }
                } catch (PulsarClientException e) {
                }
                startBroker();
                subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
                th = null;
            } catch (Throwable th7) {
                th2 = th7;
                throw th7;
            }
            try {
                try {
                    Message<byte[]> receive2 = subscribe.receive();
                    Assert.assertEquals(receive2.getKey(), "key0");
                    Assert.assertEquals(receive2.getData(), "content2".getBytes());
                    if (subscribe != null) {
                        if (0 == 0) {
                            subscribe.close();
                            return;
                        }
                        try {
                            subscribe.close();
                        } catch (Throwable th8) {
                            th.addSuppressed(th8);
                        }
                    }
                } catch (Throwable th9) {
                    th = th9;
                    throw th9;
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testCompactEmptyTopic() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).create();
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        create.newMessage().key("key0").value("content0".getBytes()).send();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
        Throwable th = null;
        try {
            try {
                Message<byte[]> receive = subscribe.receive();
                Assert.assertEquals(receive.getKey(), "key0");
                Assert.assertEquals(receive.getData(), "content0".getBytes());
                if (subscribe != null) {
                    if (0 == 0) {
                        subscribe.close();
                        return;
                    }
                    try {
                        subscribe.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (subscribe != null) {
                if (th != null) {
                    try {
                        subscribe.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    subscribe.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testFirstMessageRetained() throws Exception {
        Throwable th;
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).create();
        Throwable th2 = null;
        try {
            create.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync();
            create.newMessage().key("key2").value("my-message-2".getBytes()).sendAsync();
            create.newMessage().key("key2").value("my-message-3".getBytes()).send();
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th3) {
                        th2.addSuppressed(th3);
                    }
                } else {
                    create.close();
                }
            }
            ArrayList arrayList = new ArrayList();
            Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
            Throwable th4 = null;
            try {
                try {
                    arrayList.add(subscribe.receive());
                    arrayList.add(subscribe.receive());
                    arrayList.add(subscribe.receive());
                    if (subscribe != null) {
                        if (0 != 0) {
                            try {
                                subscribe.close();
                            } catch (Throwable th5) {
                                th4.addSuppressed(th5);
                            }
                        } else {
                            subscribe.close();
                        }
                    }
                    new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
                    subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
                    th = null;
                } catch (Throwable th6) {
                    th4 = th6;
                    throw th6;
                }
                try {
                    try {
                        Message<byte[]> receive = subscribe.receive();
                        Assert.assertEquals(receive.getKey(), "key1");
                        Assert.assertEquals(new String(receive.getData()), "my-message-1");
                        Assert.assertEquals(receive.getMessageId(), ((Message) arrayList.get(0)).getMessageId());
                        Message<byte[]> receive2 = subscribe.receive();
                        Assert.assertEquals(receive2.getKey(), "key2");
                        Assert.assertEquals(new String(receive2.getData()), "my-message-3");
                        Assert.assertEquals(receive2.getMessageId(), ((Message) arrayList.get(2)).getMessageId());
                        if (subscribe != null) {
                            if (0 == 0) {
                                subscribe.close();
                                return;
                            }
                            try {
                                subscribe.close();
                            } catch (Throwable th7) {
                                th.addSuppressed(th7);
                            }
                        }
                    } catch (Throwable th8) {
                        th = th8;
                        throw th8;
                    }
                } finally {
                }
            } finally {
            }
        } catch (Throwable th9) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th10) {
                        th2.addSuppressed(th10);
                    }
                } else {
                    create.close();
                }
            }
            throw th9;
        }
    }

    @Test
    public void testBatchMessageIdsDontChange() throws Exception {
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Throwable th = null;
        try {
            try {
                create.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync();
                create.newMessage().key("key2").value("my-message-2".getBytes()).sendAsync();
                create.newMessage().key("key2").value("my-message-3".getBytes()).send();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                ArrayList arrayList = new ArrayList();
                Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
                Throwable th3 = null;
                try {
                    arrayList.add(subscribe.receive());
                    arrayList.add(subscribe.receive());
                    arrayList.add(subscribe.receive());
                    if (subscribe != null) {
                        if (0 != 0) {
                            try {
                                subscribe.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            subscribe.close();
                        }
                    }
                    Assert.assertEquals(((Message) arrayList.get(0)).getMessageId().getLedgerId(), ((Message) arrayList.get(1)).getMessageId().getLedgerId());
                    Assert.assertEquals(((Message) arrayList.get(0)).getMessageId().getLedgerId(), ((Message) arrayList.get(2)).getMessageId().getLedgerId());
                    Assert.assertEquals(((Message) arrayList.get(0)).getMessageId().getEntryId(), ((Message) arrayList.get(1)).getMessageId().getEntryId());
                    Assert.assertEquals(((Message) arrayList.get(0)).getMessageId().getEntryId(), ((Message) arrayList.get(2)).getMessageId().getEntryId());
                    new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
                    Consumer<byte[]> subscribe2 = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
                    Throwable th5 = null;
                    try {
                        Message<byte[]> receive = subscribe2.receive();
                        Assert.assertEquals(receive.getKey(), "key1");
                        Assert.assertEquals(new String(receive.getData()), "my-message-1");
                        Assert.assertEquals(receive.getMessageId(), ((Message) arrayList.get(0)).getMessageId());
                        Message<byte[]> receive2 = subscribe2.receive();
                        Assert.assertEquals(receive2.getKey(), "key2");
                        Assert.assertEquals(new String(receive2.getData()), "my-message-3");
                        Assert.assertEquals(receive2.getMessageId(), ((Message) arrayList.get(2)).getMessageId());
                        if (subscribe2 != null) {
                            if (0 == 0) {
                                subscribe2.close();
                                return;
                            }
                            try {
                                subscribe2.close();
                            } catch (Throwable th6) {
                                th5.addSuppressed(th6);
                            }
                        }
                    } catch (Throwable th7) {
                        if (subscribe2 != null) {
                            if (0 != 0) {
                                try {
                                    subscribe2.close();
                                } catch (Throwable th8) {
                                    th5.addSuppressed(th8);
                                }
                            } else {
                                subscribe2.close();
                            }
                        }
                        throw th7;
                    }
                } catch (Throwable th9) {
                    if (subscribe != null) {
                        if (0 != 0) {
                            try {
                                subscribe.close();
                            } catch (Throwable th10) {
                                th3.addSuppressed(th10);
                            }
                        } else {
                            subscribe.close();
                        }
                    }
                    throw th9;
                }
            } catch (Throwable th11) {
                th = th11;
                throw th11;
            }
        } catch (Throwable th12) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th13) {
                        th.addSuppressed(th13);
                    }
                } else {
                    create.close();
                }
            }
            throw th12;
        }
    }

    @Test
    public void testWholeBatchCompactedOut() throws Exception {
        Consumer<byte[]> subscribe;
        Throwable th;
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Throwable th2 = null;
        try {
            Producer<byte[]> create2 = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            Throwable th3 = null;
            try {
                try {
                    create2.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync();
                    create2.newMessage().key("key1").value("my-message-2".getBytes()).sendAsync();
                    create2.newMessage().key("key1").value("my-message-3".getBytes()).sendAsync();
                    create.newMessage().key("key1").value("my-message-4".getBytes()).send();
                    if (create2 != null) {
                        if (0 != 0) {
                            try {
                                create2.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            create2.close();
                        }
                    }
                    new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
                    subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
                    th = null;
                } catch (Throwable th5) {
                    th3 = th5;
                    throw th5;
                }
                try {
                    try {
                        Message<byte[]> receive = subscribe.receive();
                        Assert.assertEquals(receive.getKey(), "key1");
                        Assert.assertEquals(new String(receive.getData()), "my-message-4");
                        if (subscribe != null) {
                            if (0 == 0) {
                                subscribe.close();
                                return;
                            }
                            try {
                                subscribe.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        }
                    } catch (Throwable th7) {
                        th = th7;
                        throw th7;
                    }
                } catch (Throwable th8) {
                    if (subscribe != null) {
                        if (th != null) {
                            try {
                                subscribe.close();
                            } catch (Throwable th9) {
                                th.addSuppressed(th9);
                            }
                        } else {
                            subscribe.close();
                        }
                    }
                    throw th8;
                }
            } catch (Throwable th10) {
                if (create2 != null) {
                    if (th3 != null) {
                        try {
                            create2.close();
                        } catch (Throwable th11) {
                            th3.addSuppressed(th11);
                        }
                    } else {
                        create2.close();
                    }
                }
                throw th10;
            }
        } finally {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th12) {
                        th2.addSuppressed(th12);
                    }
                } else {
                    create.close();
                }
            }
        }
    }

    @Test
    public void testKeyLessMessagesPassThrough() throws Exception {
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        Throwable th = null;
        try {
            Producer<byte[]> create2 = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();
            Throwable th2 = null;
            try {
                create.newMessage().value("my-message-1".getBytes()).send();
                create2.newMessage().value("my-message-2".getBytes()).sendAsync();
                create2.newMessage().key("key1").value("my-message-3".getBytes()).sendAsync();
                create2.newMessage().key("key1").value("my-message-4".getBytes()).send();
                create2.newMessage().key("key2").value("my-message-5".getBytes()).sendAsync();
                create2.newMessage().key("key2").value("my-message-6".getBytes()).sendAsync();
                create2.newMessage().value("my-message-7".getBytes()).send();
                create.newMessage().value("my-message-8".getBytes()).send();
                if (create2 != null) {
                    if (0 != 0) {
                        try {
                            create2.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        create2.close();
                    }
                }
                new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
                Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
                Throwable th4 = null;
                try {
                    try {
                        Message<byte[]> receive = subscribe.receive();
                        Assert.assertFalse(receive.hasKey());
                        Assert.assertEquals(new String(receive.getData()), "my-message-1");
                        Message<byte[]> receive2 = subscribe.receive();
                        Assert.assertFalse(receive2.hasKey());
                        Assert.assertEquals(new String(receive2.getData()), "my-message-2");
                        Message<byte[]> receive3 = subscribe.receive();
                        Assert.assertEquals(receive3.getKey(), "key1");
                        Assert.assertEquals(new String(receive3.getData()), "my-message-4");
                        Message<byte[]> receive4 = subscribe.receive();
                        Assert.assertEquals(receive4.getKey(), "key2");
                        Assert.assertEquals(new String(receive4.getData()), "my-message-6");
                        Message<byte[]> receive5 = subscribe.receive();
                        Assert.assertFalse(receive5.hasKey());
                        Assert.assertEquals(new String(receive5.getData()), "my-message-7");
                        Message<byte[]> receive6 = subscribe.receive();
                        Assert.assertFalse(receive6.hasKey());
                        Assert.assertEquals(new String(receive6.getData()), "my-message-8");
                        if (subscribe != null) {
                            if (0 == 0) {
                                subscribe.close();
                                return;
                            }
                            try {
                                subscribe.close();
                            } catch (Throwable th5) {
                                th4.addSuppressed(th5);
                            }
                        }
                    } catch (Throwable th6) {
                        th4 = th6;
                        throw th6;
                    }
                } catch (Throwable th7) {
                    if (subscribe != null) {
                        if (th4 != null) {
                            try {
                                subscribe.close();
                            } catch (Throwable th8) {
                                th4.addSuppressed(th8);
                            }
                        } else {
                            subscribe.close();
                        }
                    }
                    throw th7;
                }
            } catch (Throwable th9) {
                if (create2 != null) {
                    if (0 != 0) {
                        try {
                            create2.close();
                        } catch (Throwable th10) {
                            th2.addSuppressed(th10);
                        }
                    } else {
                        create2.close();
                    }
                }
                throw th9;
            }
        } finally {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th11) {
                        th.addSuppressed(th11);
                    }
                } else {
                    create.close();
                }
            }
        }
    }

    @Test
    public void testEmptyPayloadDeletes() throws Exception {
        Consumer<byte[]> subscribe;
        Throwable th;
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).create();
        Throwable th2 = null;
        try {
            Producer<byte[]> create2 = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();
            Throwable th3 = null;
            try {
                try {
                    create.newMessage().key("key0").value("my-message-0".getBytes()).send();
                    create.newMessage().key("key1").value("my-message-1".getBytes()).send();
                    create.newMessage().key("key1").send();
                    create2.newMessage().key("key2").value("my-message-2".getBytes()).sendAsync();
                    create2.newMessage().key("key3").value("my-message-3".getBytes()).sendAsync();
                    create2.newMessage().key("key2").send();
                    create2.newMessage().key("key3").sendAsync();
                    create2.newMessage().key("key4").value("my-message-3".getBytes()).sendAsync();
                    create2.newMessage().key("key4").send();
                    create.newMessage().key("key4").value("my-message-4".getBytes()).send();
                    if (create2 != null) {
                        if (0 != 0) {
                            try {
                                create2.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            create2.close();
                        }
                    }
                    new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
                    subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
                    th = null;
                } catch (Throwable th5) {
                    th3 = th5;
                    throw th5;
                }
                try {
                    try {
                        Message<byte[]> receive = subscribe.receive();
                        Assert.assertEquals(receive.getKey(), "key0");
                        Assert.assertEquals(new String(receive.getData()), "my-message-0");
                        Message<byte[]> receive2 = subscribe.receive();
                        Assert.assertEquals(receive2.getKey(), "key4");
                        Assert.assertEquals(new String(receive2.getData()), "my-message-4");
                        if (subscribe != null) {
                            if (0 == 0) {
                                subscribe.close();
                                return;
                            }
                            try {
                                subscribe.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        }
                    } catch (Throwable th7) {
                        th = th7;
                        throw th7;
                    }
                } catch (Throwable th8) {
                    if (subscribe != null) {
                        if (th != null) {
                            try {
                                subscribe.close();
                            } catch (Throwable th9) {
                                th.addSuppressed(th9);
                            }
                        } else {
                            subscribe.close();
                        }
                    }
                    throw th8;
                }
            } catch (Throwable th10) {
                if (create2 != null) {
                    if (th3 != null) {
                        try {
                            create2.close();
                        } catch (Throwable th11) {
                            th3.addSuppressed(th11);
                        }
                    } else {
                        create2.close();
                    }
                }
                throw th10;
            }
        } finally {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th12) {
                        th2.addSuppressed(th12);
                    }
                } else {
                    create.close();
                }
            }
        }
    }

    @Test
    public void testEmptyPayloadDeletesWhenCompressed() throws Exception {
        Consumer<byte[]> subscribe;
        Throwable th;
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).compressionType(CompressionType.LZ4).create();
        Throwable th2 = null;
        try {
            Producer<byte[]> create2 = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").maxPendingMessages(3).enableBatching(true).compressionType(CompressionType.LZ4).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();
            Throwable th3 = null;
            try {
                try {
                    create.newMessage().key("key0").value("my-message-0".getBytes()).send();
                    create.newMessage().key("key1").value("my-message-1".getBytes()).send();
                    create.newMessage().key("key1").send();
                    create2.newMessage().key("key2").value("my-message-2".getBytes()).sendAsync();
                    create2.newMessage().key("key3").value("my-message-3".getBytes()).sendAsync();
                    create2.newMessage().key("key2").send();
                    create2.newMessage().key("key3").sendAsync();
                    create2.newMessage().key("key4").value("my-message-3".getBytes()).sendAsync();
                    create2.newMessage().key("key4").send();
                    create.newMessage().key("key4").value("my-message-4".getBytes()).send();
                    if (create2 != null) {
                        if (0 != 0) {
                            try {
                                create2.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            create2.close();
                        }
                    }
                    new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
                    subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
                    th = null;
                } catch (Throwable th5) {
                    th3 = th5;
                    throw th5;
                }
                try {
                    try {
                        Message<byte[]> receive = subscribe.receive();
                        Assert.assertEquals(receive.getKey(), "key0");
                        Assert.assertEquals(new String(receive.getData()), "my-message-0");
                        Message<byte[]> receive2 = subscribe.receive();
                        Assert.assertEquals(receive2.getKey(), "key4");
                        Assert.assertEquals(new String(receive2.getData()), "my-message-4");
                        if (subscribe != null) {
                            if (0 == 0) {
                                subscribe.close();
                                return;
                            }
                            try {
                                subscribe.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        }
                    } catch (Throwable th7) {
                        th = th7;
                        throw th7;
                    }
                } catch (Throwable th8) {
                    if (subscribe != null) {
                        if (th != null) {
                            try {
                                subscribe.close();
                            } catch (Throwable th9) {
                                th.addSuppressed(th9);
                            }
                        } else {
                            subscribe.close();
                        }
                    }
                    throw th8;
                }
            } catch (Throwable th10) {
                if (create2 != null) {
                    if (th3 != null) {
                        try {
                            create2.close();
                        } catch (Throwable th11) {
                            th3.addSuppressed(th11);
                        }
                    } else {
                        create2.close();
                    }
                }
                throw th10;
            }
        } finally {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th12) {
                        th2.addSuppressed(th12);
                    }
                } else {
                    create.close();
                }
            }
        }
    }

    @Test
    public void testCompactorReadsCompacted() throws Exception {
        Throwable th;
        Set newConcurrentHashSet = Sets.newConcurrentHashSet();
        Mockito.when(this.mockBookKeeper.newOpenLedgerOp()).thenAnswer(invocationOnMock -> {
            OpenBuilder openBuilder = (OpenBuilder) Mockito.spy(invocationOnMock.callRealMethod());
            Mockito.when(openBuilder.withLedgerId(Mockito.anyLong())).thenAnswer(invocationOnMock -> {
                newConcurrentHashSet.add((Long) invocationOnMock.getArguments()[0]);
                return invocationOnMock.callRealMethod();
            });
            return openBuilder;
        });
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").subscribe().close();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        Throwable th2 = null;
        try {
            try {
                create.newMessage().key("key0").value("my-message-0".getBytes()).send();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        create.close();
                    }
                }
                ((Topic) this.pulsar.getBrokerService().getTopicReference("persistent://my-property/use/my-ns/my-topic1").get()).close().get();
                create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    create.newMessage().key("key1").value("my-message-1".getBytes()).send();
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            create.close();
                        }
                    }
                    String name = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://my-property/use/my-ns/my-topic1").get()).getManagedLedger().getName();
                    ManagedLedgerInfo managedLedgerInfo = this.pulsar.getManagedLedgerFactory().getManagedLedgerInfo(name);
                    Assert.assertEquals(managedLedgerInfo.ledgers.size(), 2);
                    Assert.assertTrue(newConcurrentHashSet.isEmpty());
                    TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
                    twoPhaseCompactor.compact("persistent://my-property/use/my-ns/my-topic1").get();
                    Assert.assertTrue(newConcurrentHashSet.contains(Long.valueOf(((ManagedLedgerInfo.LedgerInfo) managedLedgerInfo.ledgers.get(0)).ledgerId)));
                    Assert.assertFalse(newConcurrentHashSet.contains(Long.valueOf(((ManagedLedgerInfo.LedgerInfo) managedLedgerInfo.ledgers.get(1)).ledgerId)));
                    newConcurrentHashSet.clear();
                    ((Topic) this.pulsar.getBrokerService().getTopicReference("persistent://my-property/use/my-ns/my-topic1").get()).close().get();
                    Producer<byte[]> create2 = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
                    Throwable th6 = null;
                    try {
                        create2.newMessage().key("key2").value("my-message-2".getBytes()).send();
                        if (create2 != null) {
                            if (0 != 0) {
                                try {
                                    create2.close();
                                } catch (Throwable th7) {
                                    th6.addSuppressed(th7);
                                }
                            } else {
                                create2.close();
                            }
                        }
                        ManagedLedgerInfo managedLedgerInfo2 = this.pulsar.getManagedLedgerFactory().getManagedLedgerInfo(name);
                        Assert.assertEquals(managedLedgerInfo2.ledgers.size(), 3);
                        Assert.assertFalse(newConcurrentHashSet.contains(Long.valueOf(((ManagedLedgerInfo.LedgerInfo) managedLedgerInfo2.ledgers.get(0)).ledgerId)));
                        Assert.assertFalse(newConcurrentHashSet.contains(Long.valueOf(((ManagedLedgerInfo.LedgerInfo) managedLedgerInfo2.ledgers.get(1)).ledgerId)));
                        Assert.assertFalse(newConcurrentHashSet.contains(Long.valueOf(((ManagedLedgerInfo.LedgerInfo) managedLedgerInfo2.ledgers.get(2)).ledgerId)));
                        newConcurrentHashSet.clear();
                        twoPhaseCompactor.compact("persistent://my-property/use/my-ns/my-topic1").get();
                        Assert.assertFalse(newConcurrentHashSet.contains(Long.valueOf(((ManagedLedgerInfo.LedgerInfo) managedLedgerInfo2.ledgers.get(0)).ledgerId)));
                        Assert.assertTrue(newConcurrentHashSet.contains(Long.valueOf(((ManagedLedgerInfo.LedgerInfo) managedLedgerInfo2.ledgers.get(1)).ledgerId)));
                        Assert.assertFalse(newConcurrentHashSet.contains(Long.valueOf(((ManagedLedgerInfo.LedgerInfo) managedLedgerInfo2.ledgers.get(2)).ledgerId)));
                        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
                        Throwable th8 = null;
                        try {
                            Message<byte[]> receive = subscribe.receive();
                            Assert.assertEquals(receive.getKey(), "key0");
                            Assert.assertEquals(new String(receive.getData()), "my-message-0");
                            Message<byte[]> receive2 = subscribe.receive();
                            Assert.assertEquals(receive2.getKey(), "key1");
                            Assert.assertEquals(new String(receive2.getData()), "my-message-1");
                            Message<byte[]> receive3 = subscribe.receive();
                            Assert.assertEquals(receive3.getKey(), "key2");
                            Assert.assertEquals(new String(receive3.getData()), "my-message-2");
                            if (subscribe != null) {
                                if (0 == 0) {
                                    subscribe.close();
                                    return;
                                }
                                try {
                                    subscribe.close();
                                } catch (Throwable th9) {
                                    th8.addSuppressed(th9);
                                }
                            }
                        } catch (Throwable th10) {
                            if (subscribe != null) {
                                if (0 != 0) {
                                    try {
                                        subscribe.close();
                                    } catch (Throwable th11) {
                                        th8.addSuppressed(th11);
                                    }
                                } else {
                                    subscribe.close();
                                }
                            }
                            throw th10;
                        }
                    } catch (Throwable th12) {
                        if (create2 != null) {
                            if (0 != 0) {
                                try {
                                    create2.close();
                                } catch (Throwable th13) {
                                    th6.addSuppressed(th13);
                                }
                            } else {
                                create2.close();
                            }
                        }
                        throw th12;
                    }
                } catch (Throwable th14) {
                    th = th14;
                    throw th14;
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testCompactCompressedNoBatch() throws Exception {
        Consumer<byte[]> subscribe;
        Throwable th;
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").compressionType(CompressionType.LZ4).enableBatching(false).create();
        Throwable th2 = null;
        try {
            try {
                create.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync();
                create.newMessage().key("key2").value("my-message-2".getBytes()).sendAsync();
                create.newMessage().key("key2").value("my-message-3".getBytes()).send();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        create.close();
                    }
                }
                new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
                subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    Message<byte[]> receive = subscribe.receive();
                    Assert.assertEquals(receive.getKey(), "key1");
                    Assert.assertEquals(new String(receive.getData()), "my-message-1");
                    Message<byte[]> receive2 = subscribe.receive();
                    Assert.assertEquals(receive2.getKey(), "key2");
                    Assert.assertEquals(new String(receive2.getData()), "my-message-3");
                    if (subscribe != null) {
                        if (0 == 0) {
                            subscribe.close();
                            return;
                        }
                        try {
                            subscribe.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } catch (Throwable th7) {
                if (subscribe != null) {
                    if (th != null) {
                        try {
                            subscribe.close();
                        } catch (Throwable th8) {
                            th.addSuppressed(th8);
                        }
                    } else {
                        subscribe.close();
                    }
                }
                throw th7;
            }
        } catch (Throwable th9) {
            if (create != null) {
                if (th2 != null) {
                    try {
                        create.close();
                    } catch (Throwable th10) {
                        th2.addSuppressed(th10);
                    }
                } else {
                    create.close();
                }
            }
            throw th9;
        }
    }

    @Test
    public void testCompactCompressedBatching() throws Exception {
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").compressionType(CompressionType.LZ4).maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();
        Throwable th = null;
        try {
            try {
                create.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync();
                create.newMessage().key("key2").value("my-message-2".getBytes()).sendAsync();
                create.newMessage().key("key2").value("my-message-3".getBytes()).send();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
                Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
                Throwable th3 = null;
                try {
                    Message<byte[]> receive = subscribe.receive();
                    Assert.assertEquals(receive.getKey(), "key1");
                    Assert.assertEquals(new String(receive.getData()), "my-message-1");
                    Message<byte[]> receive2 = subscribe.receive();
                    Assert.assertEquals(receive2.getKey(), "key2");
                    Assert.assertEquals(new String(receive2.getData()), "my-message-3");
                    if (subscribe != null) {
                        if (0 == 0) {
                            subscribe.close();
                            return;
                        }
                        try {
                            subscribe.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    if (subscribe != null) {
                        if (0 != 0) {
                            try {
                                subscribe.close();
                            } catch (Throwable th6) {
                                th3.addSuppressed(th6);
                            }
                        } else {
                            subscribe.close();
                        }
                    }
                    throw th5;
                }
            } catch (Throwable th7) {
                th = th7;
                throw th7;
            }
        } catch (Throwable th8) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    create.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void testCompactEncryptedNoBatch() throws Exception {
        Consumer<byte[]> subscribe;
        Throwable th;
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new EncKeyReader()).enableBatching(false).create();
        Throwable th2 = null;
        try {
            try {
                create.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync();
                create.newMessage().key("key2").value("my-message-2".getBytes()).sendAsync();
                create.newMessage().key("key2").value("my-message-3".getBytes()).send();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        create.close();
                    }
                }
                new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
                subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").cryptoKeyReader(new EncKeyReader()).readCompacted(true).subscribe();
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    Message<byte[]> receive = subscribe.receive();
                    Assert.assertEquals(receive.getKey(), "key1");
                    Assert.assertEquals(new String(receive.getData()), "my-message-1");
                    Message<byte[]> receive2 = subscribe.receive();
                    Assert.assertEquals(receive2.getKey(), "key2");
                    Assert.assertEquals(new String(receive2.getData()), "my-message-3");
                    if (subscribe != null) {
                        if (0 == 0) {
                            subscribe.close();
                            return;
                        }
                        try {
                            subscribe.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } catch (Throwable th7) {
                if (subscribe != null) {
                    if (th != null) {
                        try {
                            subscribe.close();
                        } catch (Throwable th8) {
                            th.addSuppressed(th8);
                        }
                    } else {
                        subscribe.close();
                    }
                }
                throw th7;
            }
        } catch (Throwable th9) {
            if (create != null) {
                if (th2 != null) {
                    try {
                        create.close();
                    } catch (Throwable th10) {
                        th2.addSuppressed(th10);
                    }
                } else {
                    create.close();
                }
            }
            throw th9;
        }
    }

    @Test
    public void testCompactEncryptedBatching() throws Exception {
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new EncKeyReader()).maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();
        Throwable th = null;
        try {
            try {
                create.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync();
                create.newMessage().key("key2").value("my-message-2".getBytes()).sendAsync();
                create.newMessage().key("key2").value("my-message-3".getBytes()).send();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
                Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").cryptoKeyReader(new EncKeyReader()).readCompacted(true).subscribe();
                Throwable th3 = null;
                try {
                    Message<byte[]> receive = subscribe.receive();
                    Assert.assertEquals(receive.getKey(), "key1");
                    Assert.assertEquals(new String(receive.getData()), "my-message-1");
                    Message<byte[]> receive2 = subscribe.receive();
                    Assert.assertEquals(receive2.getKey(), "key2");
                    Assert.assertEquals(new String(receive2.getData()), "my-message-2");
                    Message<byte[]> receive3 = subscribe.receive();
                    Assert.assertEquals(receive3.getKey(), "key2");
                    Assert.assertEquals(new String(receive3.getData()), "my-message-3");
                    if (subscribe != null) {
                        if (0 == 0) {
                            subscribe.close();
                            return;
                        }
                        try {
                            subscribe.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    if (subscribe != null) {
                        if (0 != 0) {
                            try {
                                subscribe.close();
                            } catch (Throwable th6) {
                                th3.addSuppressed(th6);
                            }
                        } else {
                            subscribe.close();
                        }
                    }
                    throw th5;
                }
            } catch (Throwable th7) {
                th = th7;
                throw th7;
            }
        } catch (Throwable th8) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    create.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void testCompactEncryptedAndCompressedNoBatch() throws Exception {
        Consumer<byte[]> subscribe;
        Throwable th;
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new EncKeyReader()).compressionType(CompressionType.LZ4).enableBatching(false).create();
        Throwable th2 = null;
        try {
            try {
                create.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync();
                create.newMessage().key("key2").value("my-message-2".getBytes()).sendAsync();
                create.newMessage().key("key2").value("my-message-3".getBytes()).send();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        create.close();
                    }
                }
                new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
                subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").cryptoKeyReader(new EncKeyReader()).readCompacted(true).subscribe();
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    Message<byte[]> receive = subscribe.receive();
                    Assert.assertEquals(receive.getKey(), "key1");
                    Assert.assertEquals(new String(receive.getData()), "my-message-1");
                    Message<byte[]> receive2 = subscribe.receive();
                    Assert.assertEquals(receive2.getKey(), "key2");
                    Assert.assertEquals(new String(receive2.getData()), "my-message-3");
                    if (subscribe != null) {
                        if (0 == 0) {
                            subscribe.close();
                            return;
                        }
                        try {
                            subscribe.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } catch (Throwable th7) {
                if (subscribe != null) {
                    if (th != null) {
                        try {
                            subscribe.close();
                        } catch (Throwable th8) {
                            th.addSuppressed(th8);
                        }
                    } else {
                        subscribe.close();
                    }
                }
                throw th7;
            }
        } catch (Throwable th9) {
            if (create != null) {
                if (th2 != null) {
                    try {
                        create.close();
                    } catch (Throwable th10) {
                        th2.addSuppressed(th10);
                    }
                } else {
                    create.close();
                }
            }
            throw th9;
        }
    }

    @Test
    public void testCompactEncryptedAndCompressedBatching() throws Exception {
        Consumer<byte[]> subscribe;
        Throwable th;
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new EncKeyReader()).compressionType(CompressionType.LZ4).maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();
        Throwable th2 = null;
        try {
            try {
                create.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync();
                create.newMessage().key("key2").value("my-message-2".getBytes()).sendAsync();
                create.newMessage().key("key2").value("my-message-3".getBytes()).send();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        create.close();
                    }
                }
                new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
                subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").cryptoKeyReader(new EncKeyReader()).readCompacted(true).subscribe();
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    Message<byte[]> receive = subscribe.receive();
                    Assert.assertEquals(receive.getKey(), "key1");
                    Assert.assertEquals(new String(receive.getData()), "my-message-1");
                    Message<byte[]> receive2 = subscribe.receive();
                    Assert.assertEquals(receive2.getKey(), "key2");
                    Assert.assertEquals(new String(receive2.getData()), "my-message-2");
                    Message<byte[]> receive3 = subscribe.receive();
                    Assert.assertEquals(receive3.getKey(), "key2");
                    Assert.assertEquals(new String(receive3.getData()), "my-message-3");
                    if (subscribe != null) {
                        if (0 == 0) {
                            subscribe.close();
                            return;
                        }
                        try {
                            subscribe.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } catch (Throwable th7) {
                if (subscribe != null) {
                    if (th != null) {
                        try {
                            subscribe.close();
                        } catch (Throwable th8) {
                            th.addSuppressed(th8);
                        }
                    } else {
                        subscribe.close();
                    }
                }
                throw th7;
            }
        } catch (Throwable th9) {
            if (create != null) {
                if (th2 != null) {
                    try {
                        create.close();
                    } catch (Throwable th10) {
                        th2.addSuppressed(th10);
                    }
                } else {
                    create.close();
                }
            }
            throw th9;
        }
    }

    @Test
    public void testEmptyPayloadDeletesWhenEncrypted() throws Exception {
        Consumer<byte[]> subscribe;
        Throwable th;
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new EncKeyReader()).create();
        Throwable th2 = null;
        try {
            Producer<byte[]> create2 = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").maxPendingMessages(3).enableBatching(true).addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new EncKeyReader()).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();
            Throwable th3 = null;
            try {
                try {
                    create.newMessage().key("key0").value("my-message-0".getBytes()).send();
                    create.newMessage().key("key1").value("my-message-1".getBytes()).send();
                    create.newMessage().key("key1").send();
                    create2.newMessage().key("key2").value("my-message-2".getBytes()).sendAsync();
                    create2.newMessage().key("key3").value("my-message-3".getBytes()).sendAsync();
                    create2.newMessage().key("key2").send();
                    create.newMessage().key("key4").value("my-message-4".getBytes()).send();
                    if (create2 != null) {
                        if (0 != 0) {
                            try {
                                create2.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            create2.close();
                        }
                    }
                    new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
                    subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").cryptoKeyReader(new EncKeyReader()).subscriptionName("sub1").readCompacted(true).subscribe();
                    th = null;
                } catch (Throwable th5) {
                    th3 = th5;
                    throw th5;
                }
                try {
                    try {
                        Message<byte[]> receive = subscribe.receive();
                        Assert.assertEquals(receive.getKey(), "key0");
                        Assert.assertEquals(new String(receive.getData()), "my-message-0");
                        Message<byte[]> receive2 = subscribe.receive();
                        Assert.assertEquals(receive2.getKey(), "key2");
                        Assert.assertEquals(new String(receive2.getData()), "my-message-2");
                        Message<byte[]> receive3 = subscribe.receive();
                        Assert.assertEquals(receive3.getKey(), "key3");
                        Assert.assertEquals(new String(receive3.getData()), "my-message-3");
                        Message<byte[]> receive4 = subscribe.receive();
                        Assert.assertEquals(receive4.getKey(), "key2");
                        Assert.assertEquals(new String(receive4.getData()), "");
                        Message<byte[]> receive5 = subscribe.receive();
                        Assert.assertEquals(receive5.getKey(), "key4");
                        Assert.assertEquals(new String(receive5.getData()), "my-message-4");
                        if (subscribe != null) {
                            if (0 == 0) {
                                subscribe.close();
                                return;
                            }
                            try {
                                subscribe.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        }
                    } catch (Throwable th7) {
                        th = th7;
                        throw th7;
                    }
                } catch (Throwable th8) {
                    if (subscribe != null) {
                        if (th != null) {
                            try {
                                subscribe.close();
                            } catch (Throwable th9) {
                                th.addSuppressed(th9);
                            }
                        } else {
                            subscribe.close();
                        }
                    }
                    throw th8;
                }
            } catch (Throwable th10) {
                if (create2 != null) {
                    if (th3 != null) {
                        try {
                            create2.close();
                        } catch (Throwable th11) {
                            th3.addSuppressed(th11);
                        }
                    } else {
                        create2.close();
                    }
                }
                throw th10;
            }
        } finally {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th12) {
                        th2.addSuppressed(th12);
                    }
                } else {
                    create.close();
                }
            }
        }
    }

    @Test(timeOut = 20000)
    public void testCompactionWithLastDeletedKey() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        create.newMessage().key("1").value("1".getBytes()).send();
        create.newMessage().key("2").value("2".getBytes()).send();
        create.newMessage().key("3").value("3".getBytes()).send();
        create.newMessage().key("1").value("".getBytes()).send();
        create.newMessage().key("2").value("".getBytes()).send();
        new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
        HashSet newHashSet = Sets.newHashSet("3");
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
        Throwable th = null;
        try {
            Assert.assertTrue(newHashSet.remove(subscribe.receive(2, TimeUnit.SECONDS).getKey()));
            if (subscribe != null) {
                if (0 == 0) {
                    subscribe.close();
                    return;
                }
                try {
                    subscribe.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (subscribe != null) {
                if (0 != 0) {
                    try {
                        subscribe.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    subscribe.close();
                }
            }
            throw th3;
        }
    }

    @Test(timeOut = 20000)
    public void testEmptyCompactionLedger() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe().close();
        create.newMessage().key("1").value("1".getBytes()).send();
        create.newMessage().key("2").value("2".getBytes()).send();
        create.newMessage().key("1").value("".getBytes()).send();
        create.newMessage().key("2").value("".getBytes()).send();
        new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("sub1").readCompacted(true).subscribe();
        Throwable th = null;
        try {
            try {
                Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
                if (subscribe != null) {
                    if (0 == 0) {
                        subscribe.close();
                        return;
                    }
                    try {
                        subscribe.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (subscribe != null) {
                if (th != null) {
                    try {
                        subscribe.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    subscribe.close();
                }
            }
            throw th4;
        }
    }
}
