package org.apache.curator.framework.recipes.queue;

import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.math.stat.descriptive.SummaryStatistics;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.data.Stat;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/curator/framework/recipes/queue/TestQueueSharder.class */
public class TestQueueSharder extends BaseClassForTests {
    @Test
    public void testDistribution() throws Exception {
        Timing timing = new Timing();
        CuratorFramework newClient = CuratorFrameworkFactory.newClient(this.server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
        QueueSharder queueSharder = null;
        try {
            newClient.start();
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            queueSharder = new QueueSharder(newClient, makeAllocator(new QueueConsumer<String>() { // from class: org.apache.curator.framework.recipes.queue.TestQueueSharder.1
                public void consumeMessage(String str) throws Exception {
                    countDownLatch.await();
                }

                public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                }
            }), "/queues", "/leader", QueueSharderPolicies.builder().newQueueThreshold(100).thresholdCheckMs(1).build());
            queueSharder.start();
            for (int i = 0; i < 1000; i++) {
                queueSharder.getQueue().put(Integer.toString(i));
                Thread.sleep(5L);
            }
            timing.forWaiting().sleepABit();
            SummaryStatistics summaryStatistics = new SummaryStatistics();
            Iterator it = queueSharder.getQueuePaths().iterator();
            while (it.hasNext()) {
                int numChildren = ((Stat) newClient.checkExists().forPath((String) it.next())).getNumChildren();
                Assertions.assertTrue(numChildren > 0);
                Assertions.assertTrue(((double) numChildren) >= 10.0d);
                summaryStatistics.addValue(numChildren);
            }
            countDownLatch.countDown();
            Assertions.assertTrue(summaryStatistics.getMean() >= 90.0d);
            timing.sleepABit();
            CloseableUtils.closeQuietly(queueSharder);
            CloseableUtils.closeQuietly(newClient);
        } catch (Throwable th) {
            timing.sleepABit();
            CloseableUtils.closeQuietly(queueSharder);
            CloseableUtils.closeQuietly(newClient);
            throw th;
        }
    }

    @Test
    public void testSharderWatchSync() throws Exception {
        Timing timing = new Timing();
        CuratorFramework newClient = CuratorFrameworkFactory.newClient(this.server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
        QueueAllocator<String, DistributedQueue<String>> makeAllocator = makeAllocator(makeConsumer(null));
        QueueSharderPolicies build = QueueSharderPolicies.builder().newQueueThreshold(2).thresholdCheckMs(1).build();
        QueueSharder queueSharder = new QueueSharder(newClient, makeAllocator, "/queues", "/leader", build);
        QueueSharder queueSharder2 = new QueueSharder(newClient, makeAllocator, "/queues", "/leader", build);
        try {
            newClient.start();
            queueSharder.start();
            queueSharder2.start();
            for (int i = 0; i < 20; i++) {
                queueSharder.getQueue().put(Integer.toString(i));
            }
            timing.sleepABit();
            Assertions.assertTrue(queueSharder.getShardQty() > 1 || queueSharder2.getShardQty() > 1);
            timing.forWaiting().sleepABit();
            Assertions.assertEquals(queueSharder.getShardQty(), queueSharder2.getShardQty());
            timing.sleepABit();
            CloseableUtils.closeQuietly(queueSharder);
            CloseableUtils.closeQuietly(queueSharder2);
            CloseableUtils.closeQuietly(newClient);
        } catch (Throwable th) {
            timing.sleepABit();
            CloseableUtils.closeQuietly(queueSharder);
            CloseableUtils.closeQuietly(queueSharder2);
            CloseableUtils.closeQuietly(newClient);
            throw th;
        }
    }

    @Test
    public void testSimpleDistributedQueue() throws Exception {
        Timing timing = new Timing();
        CuratorFramework newClient = CuratorFrameworkFactory.newClient(this.server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        BlockingQueueConsumer<String> makeConsumer = makeConsumer(countDownLatch);
        QueueAllocator<String, DistributedQueue<String>> makeAllocator = makeAllocator(makeConsumer);
        QueueSharderPolicies build = QueueSharderPolicies.builder().newQueueThreshold(2).thresholdCheckMs(1).build();
        QueueSharder queueSharder = new QueueSharder(newClient, makeAllocator, "/queues", "/leader", build);
        try {
            newClient.start();
            queueSharder.start();
            queueSharder.getQueue().put("one");
            queueSharder.getQueue().put("two");
            queueSharder.getQueue().put("three");
            queueSharder.getQueue().put("four");
            countDownLatch.countDown();
            timing.sleepABit();
            queueSharder.getQueue().put("five");
            queueSharder.getQueue().put("six");
            queueSharder.getQueue().put("seven");
            queueSharder.getQueue().put("eight");
            timing.sleepABit();
            Assertions.assertTrue(queueSharder.getShardQty() > 1);
            HashSet newHashSet = Sets.newHashSet();
            for (int i = 0; i < 8; i++) {
                String str = (String) makeConsumer.take(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
                Assertions.assertNotNull(str);
                newHashSet.add(str);
            }
            Assertions.assertEquals(newHashSet, Sets.newHashSet(new String[]{"one", "two", "three", "four", "five", "six", "seven", "eight"}));
            int shardQty = queueSharder.getShardQty();
            queueSharder.close();
            queueSharder = new QueueSharder(newClient, makeAllocator, "/queues", "/leader", build);
            queueSharder.start();
            Assertions.assertEquals(queueSharder.getShardQty(), shardQty);
            CloseableUtils.closeQuietly(queueSharder);
            CloseableUtils.closeQuietly(newClient);
        } catch (Throwable th) {
            CloseableUtils.closeQuietly(queueSharder);
            CloseableUtils.closeQuietly(newClient);
            throw th;
        }
    }

    private QueueAllocator<String, DistributedQueue<String>> makeAllocator(final QueueConsumer<String> queueConsumer) {
        final QueueSerializer<String> queueSerializer = new QueueSerializer<String>() { // from class: org.apache.curator.framework.recipes.queue.TestQueueSharder.2
            public byte[] serialize(String str) {
                return str.getBytes();
            }

            /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
            public String m22deserialize(byte[] bArr) {
                return new String(bArr);
            }
        };
        return new QueueAllocator<String, DistributedQueue<String>>() { // from class: org.apache.curator.framework.recipes.queue.TestQueueSharder.3
            /* renamed from: allocateQueue, reason: merged with bridge method [inline-methods] */
            public DistributedQueue<String> m23allocateQueue(CuratorFramework curatorFramework, String str) {
                return QueueBuilder.builder(curatorFramework, queueConsumer, queueSerializer, str).buildQueue();
            }
        };
    }

    private BlockingQueueConsumer<String> makeConsumer(final CountDownLatch countDownLatch) {
        return new BlockingQueueConsumer<String>(new ConnectionStateListener() { // from class: org.apache.curator.framework.recipes.queue.TestQueueSharder.4
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
            }
        }) { // from class: org.apache.curator.framework.recipes.queue.TestQueueSharder.5
            public void consumeMessage(String str) throws Exception {
                if (countDownLatch != null) {
                    countDownLatch.await();
                }
                super.consumeMessage(str);
            }
        };
    }
}
