package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javassist.compiler.TokenId;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.class */
public class ReplicatorRateLimiterTest extends ReplicatorTestBase {
    protected String methodName;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ReplicatorRateLimiterTest.class);

    /* loaded from: input_file:org/apache/pulsar/broker/service/ReplicatorRateLimiterTest$DispatchRateType.class */
    enum DispatchRateType {
        messageRate,
        byteRate
    }

    @BeforeMethod
    public void beforeMethod(Method method) throws Exception {
        this.methodName = method.getName();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.pulsar.broker.service.ReplicatorTestBase
    @BeforeClass(timeOut = 300000)
    public void setup() throws Exception {
        super.setup();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.pulsar.broker.service.ReplicatorTestBase
    @AfterClass(alwaysRun = true, timeOut = 300000)
    public void shutdown() throws Exception {
        super.shutdown();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "dispatchRateType")
    public Object[][] dispatchRateProvider() {
        return new Object[]{new Object[]{DispatchRateType.messageRate}, new Object[]{DispatchRateType.byteRate}};
    }

    @Test
    public void testReplicatorRateLimiterDynamicallyChange() throws Exception {
        log.info("--- Starting ReplicatorTest::{} --- ", this.methodName);
        String str = "pulsar/replicatorchange-" + System.currentTimeMillis();
        String str2 = "persistent://" + str + "/ratechange";
        this.admin1.namespaces().createNamespace(str);
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet("r1", "r2"));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            build.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create().close();
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getOrCreateTopic(str2).get();
            Assert.assertFalse(((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().isPresent());
            this.admin1.namespaces().setReplicatorDispatchRate(str, new DispatchRate(100, -1L, TokenId.EXOR_E));
            boolean z = false;
            int i = 0;
            while (true) {
                if (i >= 5) {
                    break;
                }
                if (((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().isPresent()) {
                    z = true;
                    break;
                } else {
                    if (i != 5 - 1) {
                        Thread.sleep(100L);
                    }
                    i++;
                }
            }
            Assert.assertTrue(z);
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), 100);
            DispatchRate dispatchRate = new DispatchRate(-1, 500, TokenId.EXOR_E);
            this.admin1.namespaces().setReplicatorDispatchRate(str, dispatchRate);
            boolean z2 = false;
            int i2 = 0;
            while (true) {
                if (i2 >= 5) {
                    break;
                }
                if (((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte() == 500) {
                    z2 = true;
                    break;
                } else {
                    if (i2 != 5 - 1) {
                        Thread.sleep(100L);
                    }
                    i2++;
                }
            }
            Assert.assertTrue(z2);
            Assert.assertEquals(this.admin1.namespaces().getReplicatorDispatchRate(str), dispatchRate);
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test(dataProvider = "dispatchRateType")
    public void testReplicatorRateLimiterMessageNotReceivedAllMessages(DispatchRateType dispatchRateType) throws Exception {
        log.info("--- Starting ReplicatorTest::{} --- ", this.methodName);
        String str = "pulsar/replicatorbyteandmsg-" + dispatchRateType.toString() + "-" + System.currentTimeMillis();
        String str2 = "persistent://" + str + "/notReceivedAll";
        this.admin1.namespaces().createNamespace(str);
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet("r1", "r2"));
        this.admin1.namespaces().setReplicatorDispatchRate(str, DispatchRateType.messageRate.equals(dispatchRateType) ? new DispatchRate(100, -1L, TokenId.EXOR_E) : new DispatchRate(-1, 100L, TokenId.EXOR_E));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Producer<byte[]> create = build.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getOrCreateTopic(str2).get();
            boolean z = false;
            int i = 0;
            while (true) {
                if (i >= 5) {
                    break;
                }
                if (((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().isPresent()) {
                    z = true;
                    break;
                } else {
                    if (i != 5 - 1) {
                        Thread.sleep(100L);
                    }
                    i++;
                }
            }
            Assert.assertTrue(z);
            if (DispatchRateType.messageRate.equals(dispatchRateType)) {
                Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), 100L);
            } else {
                Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), 100L);
            }
            build = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                AtomicInteger atomicInteger = new AtomicInteger(0);
                Consumer<byte[]> subscribe = build.newConsumer().topic(str2).subscriptionName("sub2-in-cluster2").messageListener((consumer, message) -> {
                    Assert.assertNotNull(message, "Message cannot be null");
                    log.debug("Received message [{}] in the listener", new String(message.getData()));
                    atomicInteger.incrementAndGet();
                }).subscribe();
                for (int i2 = 0; i2 < 500; i2++) {
                    create.send(new byte[80]);
                }
                log.info("Received message number: [{}]", Integer.valueOf(atomicInteger.get()));
                Assert.assertTrue(atomicInteger.get() < 200);
                subscribe.close();
                create.close();
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testReplicatorRateLimiterMessageReceivedAllMessages() throws Exception {
        log.info("--- Starting ReplicatorTest::{} --- ", this.methodName);
        String str = "pulsar/replicatormsg-" + System.currentTimeMillis();
        String str2 = "persistent://" + str + "/notReceivedAll";
        this.admin1.namespaces().createNamespace(str);
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet("r1", "r2"));
        this.admin1.namespaces().setReplicatorDispatchRate(str, new DispatchRate(100, -1L, TokenId.EXOR_E));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Producer<byte[]> create = build.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getOrCreateTopic(str2).get();
            boolean z = false;
            int i = 0;
            while (true) {
                if (i >= 5) {
                    break;
                }
                if (((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().isPresent()) {
                    z = true;
                    break;
                } else {
                    if (i != 5 - 1) {
                        Thread.sleep(100L);
                    }
                    i++;
                }
            }
            Assert.assertTrue(z);
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), 100L);
            build = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                AtomicInteger atomicInteger = new AtomicInteger(0);
                Consumer<byte[]> subscribe = build.newConsumer().topic(str2).subscriptionName("sub2-in-cluster2").messageListener((consumer, message) -> {
                    Assert.assertNotNull(message, "Message cannot be null");
                    log.debug("Received message [{}] in the listener", new String(message.getData()));
                    atomicInteger.incrementAndGet();
                }).subscribe();
                for (int i2 = 0; i2 < 50; i2++) {
                    create.send(new byte[80]);
                }
                Thread.sleep(1000L);
                log.info("Received message number: [{}]", Integer.valueOf(atomicInteger.get()));
                Assert.assertEquals(atomicInteger.get(), 50);
                for (int i3 = 0; i3 < 200; i3++) {
                    create.send(new byte[80]);
                }
                Thread.sleep(1000L);
                log.info("Received message number: [{}]", Integer.valueOf(atomicInteger.get()));
                Assert.assertEquals(atomicInteger.get(), 100);
                subscribe.close();
                create.close();
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1071577314:
                if (implMethodName.equals("lambda$testReplicatorRateLimiterMessageNotReceivedAllMessages$26ef232c$1")) {
                    z = true;
                    break;
                }
                break;
            case 1747212594:
                if (implMethodName.equals("lambda$testReplicatorRateLimiterMessageReceivedAllMessages$7c7b6d72$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/service/ReplicatorRateLimiterTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (consumer, message) -> {
                        Assert.assertNotNull(message, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message.getData()));
                        atomicInteger.incrementAndGet();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/service/ReplicatorRateLimiterTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger2 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (consumer2, message2) -> {
                        Assert.assertNotNull(message2, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message2.getData()));
                        atomicInteger2.incrementAndGet();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
