package org.apache.pulsar.compaction;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.compaction.TopicCompactionStrategyTest;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"flaky"})
/* loaded from: input_file:org/apache/pulsar/compaction/StrategicCompactionTest.class */
public class StrategicCompactionTest extends CompactionTest {
    private TopicCompactionStrategy strategy;
    private StrategicTwoPhaseCompactor compactor;

    @Override // org.apache.pulsar.compaction.CompactionTest, org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.setup();
        this.compactor = new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        this.strategy = new TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
    }

    @Override // org.apache.pulsar.compaction.CompactionTest
    protected long compact(String str) throws ExecutionException, InterruptedException {
        return ((Long) this.compactor.compact(str, this.strategy).get()).longValue();
    }

    @Override // org.apache.pulsar.compaction.CompactionTest
    protected long compact(String str, CryptoKeyReader cryptoKeyReader) throws ExecutionException, InterruptedException {
        return ((Long) this.compactor.compact(str, this.strategy, cryptoKeyReader).get()).longValue();
    }

    @Override // org.apache.pulsar.compaction.CompactionTest
    protected TwoPhaseCompactor getCompactor() {
        return this.compactor;
    }

    @Test
    public void testNumericOrderCompaction() throws Exception {
        this.strategy = new NumericOrderCompactionStrategy();
        Producer create = this.pulsarClient.newProducer(this.strategy.getSchema()).topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        Random random = new Random(0L);
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        for (int i = 0; i < 50; i++) {
            String str = "key" + random.nextInt(5);
            int nextInt = random.nextInt(i + 1);
            Integer valueOf = nextInt < i / 5 ? null : Integer.valueOf(nextInt);
            create.newMessage().key(str).value(valueOf).send();
            if (!this.strategy.shouldKeepLeft((Integer) hashMap.get(str), valueOf)) {
                if (valueOf == null) {
                    hashMap.remove(str);
                } else {
                    hashMap.put(str, valueOf);
                }
            }
            arrayList.add(Pair.of(str, valueOf));
        }
        compact("persistent://my-property/use/my-ns/my-topic1");
        PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats("persistent://my-property/use/my-ns/my-topic1", false);
        Assert.assertEquals(hashMap.size(), internalStats.compactedLedger.entries);
        Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
        Assert.assertFalse(internalStats.compactedLedger.offloaded);
        HashMap hashMap2 = new HashMap(hashMap);
        Consumer subscribe = this.pulsarClient.newConsumer(this.strategy.getSchema()).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe();
        while (!hashMap.isEmpty()) {
            try {
                Message receive = subscribe.receive(2, TimeUnit.SECONDS);
                Assert.assertEquals((Integer) receive.getValue(), (Integer) hashMap.remove(receive.getKey()), receive.getKey());
            } finally {
            }
        }
        Assert.assertTrue(hashMap.isEmpty());
        if (subscribe != null) {
            subscribe.close();
        }
        subscribe = this.pulsarClient.newConsumer(this.strategy.getSchema()).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(false).subscribe();
        do {
            try {
                Message receive2 = subscribe.receive(2, TimeUnit.SECONDS);
                Pair pair = (Pair) arrayList.remove(0);
                Assert.assertEquals(receive2.getKey(), (String) pair.getLeft());
                Assert.assertEquals((Integer) receive2.getValue(), (Integer) pair.getRight());
            } finally {
            }
        } while (!arrayList.isEmpty());
        Assert.assertTrue(arrayList.isEmpty());
        if (subscribe != null) {
            subscribe.close();
        }
        Assert.assertEquals(this.pulsar.getClient().newTableViewBuilder(this.strategy.getSchema()).topic("persistent://my-property/use/my-ns/my-topic1").loadConf(Map.of("topicCompactionStrategyClassName", this.strategy.getClass().getCanonicalName())).create().entrySet(), hashMap2.entrySet());
    }

    @Test(timeOut = 20000)
    public void testSameBatchCompactToSameBatch() throws Exception {
        String str = "persistent://my-property/use/my-ns/testSameBatchCompactToSameBatch" + UUID.randomUUID();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.INT32).compressionType(ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE).topic(str);
        producerBuilder.batchingMaxMessages(2).batchingMaxPublishDelay(10L, TimeUnit.MILLISECONDS);
        Producer create = producerBuilder.create();
        ArrayList arrayList = new ArrayList(11);
        for (int i = 0; i < 11; i++) {
            arrayList.add(create.newMessage().key(String.valueOf(i)).value(Integer.valueOf(i)).sendAsync());
        }
        FutureUtil.waitForAll(arrayList).get();
        new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact(str, this.strategy).get();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{str}).subscriptionName("sub1").readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        int i2 = 0;
        while (true) {
            try {
                Message receive = subscribe.receive(2, TimeUnit.SECONDS);
                if (receive == null) {
                    break;
                }
                MessageIdAdv messageId = receive.getMessageId();
                if (i2 < 10) {
                    Assert.assertEquals(messageId.getBatchSize(), 2);
                } else {
                    Assert.assertEquals(messageId.getBatchSize(), 0);
                }
                i2++;
            } catch (Throwable th) {
                if (subscribe != null) {
                    try {
                        subscribe.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        Assert.assertEquals(i2, 11);
        if (subscribe != null) {
            subscribe.close();
        }
    }
}
