package org.apache.pulsar.compaction;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.client.impl.RawReaderTest;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/compaction/CompactorTest.class */
public class CompactorTest extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger(CompactorTest.class);
    private ScheduledExecutorService compactionScheduler;

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("use", new ClusterData("http://127.0.0.1:" + this.BROKER_WEBSERVICE_PORT));
        this.admin.properties().createProperty("my-property", new PropertyAdmin(Lists.newArrayList(new String[]{"appid1", "appid2"}), Sets.newHashSet(new String[]{"use"})));
        this.admin.namespaces().createNamespace("my-property/use/my-ns");
        this.compactionScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    public void cleanup() throws Exception {
        super.internalCleanup();
        this.compactionScheduler.shutdownNow();
    }

    private List<String> compactAndVerify(String str, Map<String, byte[]> map) throws Exception {
        BookKeeper create = this.pulsar.getBookKeeperClientFactory().create(this.conf, (ZooKeeper) null);
        LedgerHandle openLedger = create.openLedger(((Long) new TwoPhaseCompactor(this.conf, this.pulsarClient, create, this.compactionScheduler).compact(str).get()).longValue(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
        Assert.assertEquals(openLedger.getLastAddConfirmed() + 1, map.size(), "Should have as many entries as there is keys");
        ArrayList arrayList = new ArrayList();
        Enumeration readEntries = openLedger.readEntries(0L, openLedger.getLastAddConfirmed());
        while (readEntries.hasMoreElements()) {
            RawMessage deserializeFrom = RawMessageImpl.deserializeFrom(((LedgerEntry) readEntries.nextElement()).getEntryBuffer());
            String extractKey = RawReaderTest.extractKey(deserializeFrom);
            arrayList.add(extractKey);
            ByteBuf extractPayload = extractPayload(deserializeFrom);
            byte[] bArr = new byte[extractPayload.readableBytes()];
            extractPayload.readBytes(bArr);
            Assert.assertEquals(bArr, map.remove(extractKey), "Compacted version should match expected version");
            deserializeFrom.close();
        }
        Assert.assertTrue(map.isEmpty(), "All expected keys should have been found");
        return arrayList;
    }

    @Test
    public void testCompaction() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", new ProducerConfiguration());
        HashMap hashMap = new HashMap();
        Random random = new Random(0L);
        for (int i = 0; i < 1000; i++) {
            String str = "key" + random.nextInt(10);
            byte[] bytes = ("my-message-" + str + "-" + i).getBytes();
            createProducer.send(MessageBuilder.create().setKey(str).setContent(bytes).build());
            hashMap.put(str, bytes);
        }
        compactAndVerify("persistent://my-property/use/my-ns/my-topic1", hashMap);
    }

    @Test
    public void testCompactAddCompact() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", new ProducerConfiguration());
        HashMap hashMap = new HashMap();
        createProducer.send(MessageBuilder.create().setKey("a").setContent("A_1".getBytes()).build());
        createProducer.send(MessageBuilder.create().setKey("b").setContent("B_1".getBytes()).build());
        createProducer.send(MessageBuilder.create().setKey("a").setContent("A_2".getBytes()).build());
        hashMap.put("a", "A_2".getBytes());
        hashMap.put("b", "B_1".getBytes());
        compactAndVerify("persistent://my-property/use/my-ns/my-topic1", new HashMap(hashMap));
        createProducer.send(MessageBuilder.create().setKey("b").setContent("B_2".getBytes()).build());
        hashMap.put("b", "B_2".getBytes());
        compactAndVerify("persistent://my-property/use/my-ns/my-topic1", hashMap);
    }

    @Test
    public void testCompactedInOrder() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", new ProducerConfiguration());
        createProducer.send(MessageBuilder.create().setKey("c").setContent("C_1".getBytes()).build());
        createProducer.send(MessageBuilder.create().setKey("a").setContent("A_1".getBytes()).build());
        createProducer.send(MessageBuilder.create().setKey("b").setContent("B_1".getBytes()).build());
        createProducer.send(MessageBuilder.create().setKey("a").setContent("A_2".getBytes()).build());
        HashMap hashMap = new HashMap();
        hashMap.put("a", "A_2".getBytes());
        hashMap.put("b", "B_1".getBytes());
        hashMap.put("c", "C_1".getBytes());
        Assert.assertEquals(compactAndVerify("persistent://my-property/use/my-ns/my-topic1", hashMap), Lists.newArrayList(new String[]{"c", "b", "a"}));
    }

    @Test(expectedExceptions = {ExecutionException.class})
    public void testCompactEmptyTopic() throws Exception {
        this.pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "sub1").close();
        new TwoPhaseCompactor(this.conf, this.pulsarClient, this.pulsar.getBookKeeperClientFactory().create(this.conf, (ZooKeeper) null), this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
    }

    public ByteBuf extractPayload(RawMessage rawMessage) throws Exception {
        ByteBuf headersAndPayload = rawMessage.getHeadersAndPayload();
        Commands.readChecksum(headersAndPayload);
        headersAndPayload.readBytes(new byte[headersAndPayload.readInt()]);
        return headersAndPayload.slice();
    }
}
