package org.apache.pulsar.broker.zookeeper;

import com.google.common.util.concurrent.AtomicDouble;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.class */
public class ZooKeeperClientAspectJTest {
    private ZookeeperServerTest localZkS;
    private ZooKeeper localZkc;
    private final int LOCAL_ZOOKEEPER_PORT = PortManager.nextFreePort();
    private final long ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 2000;
    private final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;

    /* loaded from: input_file:org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest$MockPulsar.class */
    class MockPulsar extends BrokerTestBase {
        private final ZooKeeper zk;

        public MockPulsar(ZooKeeper zooKeeper) {
            this.zk = zooKeeper;
        }

        @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
        protected void setup() throws Exception {
            super.baseSetup();
            ((PulsarService) Mockito.doReturn(new ZooKeeperClientFactory() { // from class: org.apache.pulsar.broker.zookeeper.ZooKeeperClientAspectJTest.MockPulsar.1
                public CompletableFuture<ZooKeeper> create(String str, ZooKeeperClientFactory.SessionType sessionType, int i) {
                    return CompletableFuture.completedFuture(MockPulsar.this.zk);
                }
            }).when(this.pulsar)).getZooKeeperClientFactory();
        }

        @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
        protected void cleanup() throws Exception {
            super.internalCleanup();
        }

        @Override // org.apache.pulsar.broker.service.BrokerTestBase
        public PulsarService getPulsar() {
            return this.pulsar;
        }

        public PulsarClient getClient() {
            return this.pulsarClient;
        }
    }

    /* loaded from: input_file:org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest$ZookeeperServerTest.class */
    class ZookeeperServerTest implements Closeable {
        private ZooKeeperServer zks;
        private NIOServerCnxnFactory serverFactory;
        private final int zkPort;
        private final String hostPort;
        private final Logger log = LoggerFactory.getLogger(ZookeeperServerTest.class);
        private final File zkTmpDir = File.createTempFile("zookeeper", "test");

        public ZookeeperServerTest(int i) throws IOException {
            this.zkPort = i;
            this.hostPort = "127.0.0.1:" + i;
            this.log.info("**** Start GZK on {} ****", this.zkTmpDir);
            if (!this.zkTmpDir.delete() || !this.zkTmpDir.mkdir()) {
                throw new IOException("Couldn't create zk directory " + this.zkTmpDir);
            }
        }

        public void start() throws IOException {
            try {
                this.zks = new ZooKeeperServer(this.zkTmpDir, this.zkTmpDir, 3000);
                this.zks.setMaxSessionTimeout(20000);
                this.serverFactory = new NIOServerCnxnFactory();
                this.serverFactory.configure(new InetSocketAddress(this.zkPort), 1000);
                this.serverFactory.startup(this.zks);
            } catch (Exception e) {
                this.log.error("Exception while instantiating ZooKeeper", e);
            }
            LocalBookkeeperEnsemble.waitForServerUp(this.hostPort, 30000L);
            this.log.info("ZooKeeper started at {}", this.hostPort);
        }

        public void stop() throws IOException {
            this.zks.shutdown();
            this.serverFactory.shutdown();
            this.log.info("Stoppend ZK server at {}", this.hostPort);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.zks.shutdown();
            this.serverFactory.shutdown();
            this.zkTmpDir.delete();
        }
    }

    @Test
    public void testZkConnected() throws Exception {
        OrderedScheduler build = OrderedScheduler.newSchedulerBuilder().build();
        try {
            this.localZkc = (ZooKeeper) new ZookeeperBkClientFactoryImpl(build).create("127.0.0.1:" + this.LOCAL_ZOOKEEPER_PORT, ZooKeeperClientFactory.SessionType.ReadWrite, 2000).get(2000L, TimeUnit.MILLISECONDS);
            Assert.assertTrue(this.localZkc.getState().isConnected());
            Assert.assertNotEquals(this.localZkc.getState(), ZooKeeper.States.CONNECTEDREADONLY);
            if (this.localZkc != null) {
                this.localZkc.close();
            }
            build.shutdown();
        } catch (Throwable th) {
            if (this.localZkc != null) {
                this.localZkc.close();
            }
            build.shutdown();
            throw th;
        }
    }

    @BeforeMethod
    void setup() throws Exception {
        this.localZkS = new ZookeeperServerTest(this.LOCAL_ZOOKEEPER_PORT);
        this.localZkS.start();
    }

    @AfterMethod
    void teardown() throws Exception {
        this.localZkS.close();
    }

    @Test(enabled = false, timeOut = 7000)
    void testZkClientAspectJTrigger() throws Exception {
        OrderedScheduler build = OrderedScheduler.newSchedulerBuilder().build();
        this.localZkc = (ZooKeeper) new ZookeeperBkClientFactoryImpl(build).create("127.0.0.1:" + this.LOCAL_ZOOKEEPER_PORT, ZooKeeperClientFactory.SessionType.ReadWrite, 2000).get(2000L, TimeUnit.MILLISECONDS);
        try {
            Assert.assertTrue(this.localZkc.getState().isConnected());
            Assert.assertNotEquals(this.localZkc.getState(), ZooKeeper.States.CONNECTEDREADONLY);
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            final AtomicInteger atomicInteger2 = new AtomicInteger(0);
            ClientCnxnAspect.EventListner eventListner = new ClientCnxnAspect.EventListner() { // from class: org.apache.pulsar.broker.zookeeper.ZooKeeperClientAspectJTest.1
                public void recordLatency(ClientCnxnAspect.EventType eventType, long j) {
                    if (eventType.equals(ClientCnxnAspect.EventType.write)) {
                        atomicInteger.incrementAndGet();
                    } else if (eventType.equals(ClientCnxnAspect.EventType.read)) {
                        atomicInteger2.incrementAndGet();
                    }
                }
            };
            ClientCnxnAspect.addListener(eventListner);
            CountDownLatch countDownLatch = new CountDownLatch(1);
            CountDownLatch countDownLatch2 = new CountDownLatch(1);
            CountDownLatch countDownLatch3 = new CountDownLatch(1);
            CountDownLatch countDownLatch4 = new CountDownLatch(1);
            this.localZkc.create("/createTest", "data".getBytes(), this.Acl, CreateMode.EPHEMERAL, (i, str, obj, str2) -> {
                countDownLatch.countDown();
            }, "create");
            this.localZkc.delete("/deleteTest", -1, (i2, str3, obj2) -> {
                countDownLatch2.countDown();
            }, "delete");
            this.localZkc.exists("/createTest", (Watcher) null, (i3, str4, obj3, stat) -> {
                countDownLatch4.countDown();
            }, (Object) null);
            this.localZkc.getData("/createTest", (Watcher) null, (i4, str5, obj4, bArr, stat2) -> {
                countDownLatch3.countDown();
            }, (Object) null);
            countDownLatch.await();
            countDownLatch2.await();
            countDownLatch4.await();
            countDownLatch3.await();
            Thread.sleep(500L);
            Assert.assertEquals(atomicInteger.get(), 2);
            Assert.assertEquals(atomicInteger2.get(), 2);
            ClientCnxnAspect.removeListener(eventListner);
            if (this.localZkc != null) {
                this.localZkc.close();
            }
            build.shutdown();
        } catch (Throwable th) {
            if (this.localZkc != null) {
                this.localZkc.close();
            }
            build.shutdown();
            throw th;
        }
    }

    @Test(enabled = false, timeOut = 7000)
    public void testZkOpStatsMetrics() throws Exception {
        OrderedScheduler build = OrderedScheduler.newSchedulerBuilder().build();
        this.localZkc = (ZooKeeper) new ZookeeperBkClientFactoryImpl(build).create("127.0.0.1:" + this.LOCAL_ZOOKEEPER_PORT, ZooKeeperClientFactory.SessionType.ReadWrite, 2000).get(2000L, TimeUnit.MILLISECONDS);
        MockPulsar mockPulsar = new MockPulsar(this.localZkc);
        mockPulsar.setup();
        try {
            PulsarClient client = mockPulsar.getClient();
            PulsarService pulsar = mockPulsar.getPulsar();
            client.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
            Metrics metric = getMetric(pulsar, "zk_write_latency");
            Assert.assertNotNull(metric);
            Assert.assertTrue(metric.getMetrics().containsKey("brk_zk_write_rate_s"));
            Assert.assertTrue(metric.getMetrics().containsKey("brk_zk_write_time_95percentile_ms"));
            Assert.assertTrue(metric.getMetrics().containsKey("brk_zk_write_time_99_99_percentile_ms"));
            Assert.assertTrue(metric.getMetrics().containsKey("brk_zk_write_time_99_9_percentile_ms"));
            Assert.assertTrue(metric.getMetrics().containsKey("brk_zk_write_time_99_percentile_ms"));
            Assert.assertTrue(metric.getMetrics().containsKey("brk_zk_write_time_mean_ms"));
            Assert.assertTrue(metric.getMetrics().containsKey("brk_zk_write_time_median_ms"));
            Metrics metric2 = getMetric(pulsar, "zk_read_latency");
            Assert.assertNotNull(metric2);
            Assert.assertTrue(metric2.getMetrics().containsKey("brk_zk_read_rate_s"));
            Assert.assertTrue(metric2.getMetrics().containsKey("brk_zk_read_time_95percentile_ms"));
            Assert.assertTrue(metric2.getMetrics().containsKey("brk_zk_read_time_99_99_percentile_ms"));
            Assert.assertTrue(metric2.getMetrics().containsKey("brk_zk_read_time_99_9_percentile_ms"));
            Assert.assertTrue(metric2.getMetrics().containsKey("brk_zk_read_time_99_percentile_ms"));
            Assert.assertTrue(metric2.getMetrics().containsKey("brk_zk_read_time_mean_ms"));
            Assert.assertTrue(metric2.getMetrics().containsKey("brk_zk_read_time_median_ms"));
            CountDownLatch countDownLatch = new CountDownLatch(1);
            CountDownLatch countDownLatch2 = new CountDownLatch(1);
            CountDownLatch countDownLatch3 = new CountDownLatch(1);
            CountDownLatch countDownLatch4 = new CountDownLatch(1);
            this.localZkc.create("/createTest", "data".getBytes(), this.Acl, CreateMode.EPHEMERAL, (i, str, obj, str2) -> {
                countDownLatch.countDown();
            }, "create");
            this.localZkc.delete("/deleteTest", -1, (i2, str3, obj2) -> {
                countDownLatch2.countDown();
            }, "delete");
            this.localZkc.exists("/createTest", (Watcher) null, (i3, str4, obj3, stat) -> {
                countDownLatch4.countDown();
            }, (Object) null);
            this.localZkc.getData("/createTest", (Watcher) null, (i4, str5, obj4, bArr, stat2) -> {
                countDownLatch3.countDown();
            }, (Object) null);
            countDownLatch.await();
            countDownLatch2.await();
            countDownLatch4.await();
            countDownLatch3.await();
            Thread.sleep(10L);
            BrokerService brokerService = pulsar.getBrokerService();
            brokerService.updateRates();
            List topicMetrics = brokerService.getTopicMetrics();
            AtomicDouble atomicDouble = new AtomicDouble();
            AtomicDouble atomicDouble2 = new AtomicDouble();
            topicMetrics.forEach(metrics -> {
                if ("zk_write_latency".equalsIgnoreCase(metrics.getDimension("metric"))) {
                    atomicDouble.set(((Double) metrics.getMetrics().get("brk_zk_write_latency_rate_s")).doubleValue());
                } else if ("zk_read_latency".equalsIgnoreCase(metrics.getDimension("metric"))) {
                    atomicDouble2.set(((Double) metrics.getMetrics().get("brk_zk_read_latency_rate_s")).doubleValue());
                }
            });
            Assert.assertTrue(atomicDouble2.get() > 0.0d);
            Assert.assertTrue(atomicDouble.get() > 0.0d);
            mockPulsar.cleanup();
            if (this.localZkc != null) {
                this.localZkc.close();
            }
            build.shutdown();
        } catch (Throwable th) {
            mockPulsar.cleanup();
            if (this.localZkc != null) {
                this.localZkc.close();
            }
            build.shutdown();
            throw th;
        }
    }

    private Metrics getMetric(PulsarService pulsarService, String str) {
        BrokerService brokerService = pulsarService.getBrokerService();
        brokerService.updateRates();
        for (Metrics metrics : brokerService.getTopicMetrics()) {
            if (str.equalsIgnoreCase(metrics.getDimension("metric"))) {
                return metrics;
            }
        }
        return null;
    }
}
