package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/api/NonPersistentKeySharedSubscriptionTest.class */
public class NonPersistentKeySharedSubscriptionTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) NonPersistentKeySharedSubscriptionTest.class);
    private static final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
        this.conf.setSubscriptionKeySharedEnable(true);
        Consumer<Integer> createConsumer = createConsumer("non-persistent://public/default/key_shared");
        try {
            createConsumer = createConsumer("non-persistent://public/default/key_shared");
            try {
                Consumer<Integer> createConsumer2 = createConsumer("non-persistent://public/default/key_shared");
                try {
                    Producer<Integer> createProducer = createProducer("non-persistent://public/default/key_shared");
                    try {
                        int i = 65536 >> 1;
                        int i2 = i >> 1;
                        int i3 = 0;
                        int i4 = 0;
                        int i5 = 0;
                        for (int i6 = 0; i6 < 10; i6++) {
                            for (String str : keys) {
                                int makeHash = Murmur3_32Hash.getInstance().makeHash(str.getBytes()) % 65536;
                                if (makeHash < i2) {
                                    i5++;
                                } else if (makeHash < i) {
                                    i4++;
                                } else {
                                    i3++;
                                }
                                createProducer.newMessage().key(str).value(Integer.valueOf(i6)).send();
                            }
                        }
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(new KeyValue<>(createConsumer, Integer.valueOf(i3)));
                        arrayList.add(new KeyValue<>(createConsumer, Integer.valueOf(i4)));
                        arrayList.add(new KeyValue<>(createConsumer2, Integer.valueOf(i5)));
                        receiveAndCheck(arrayList);
                        if (Collections.singletonList(createProducer).get(0) != null) {
                            createProducer.close();
                        }
                        if (Collections.singletonList(createConsumer2).get(0) != null) {
                            createConsumer2.close();
                        }
                        if (Collections.singletonList(createConsumer).get(0) != null) {
                            createConsumer.close();
                        }
                    } catch (Throwable th) {
                        if (Collections.singletonList(createProducer).get(0) != null) {
                            createProducer.close();
                        }
                        throw th;
                    }
                } finally {
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                }
            } finally {
                if (Collections.singletonList(createConsumer).get(0) != null) {
                    createConsumer.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(createConsumer).get(0) != null) {
                createConsumer.close();
            }
            throw th2;
        }
    }

    @Test
    public void testConsumerCrashSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException, InterruptedException {
        this.conf.setSubscriptionKeySharedEnable(true);
        Consumer<Integer> createConsumer = createConsumer("non-persistent://public/default/key_shared_consumer_crash");
        try {
            createConsumer = createConsumer("non-persistent://public/default/key_shared_consumer_crash");
            try {
                Consumer<Integer> createConsumer2 = createConsumer("non-persistent://public/default/key_shared_consumer_crash");
                try {
                    Producer<Integer> createProducer = createProducer("non-persistent://public/default/key_shared_consumer_crash");
                    try {
                        int i = 65536 >> 1;
                        int i2 = i >> 1;
                        int i3 = 0;
                        int i4 = 0;
                        int i5 = 0;
                        for (int i6 = 0; i6 < 10; i6++) {
                            for (String str : keys) {
                                int makeHash = Murmur3_32Hash.getInstance().makeHash(str.getBytes()) % 65536;
                                if (makeHash < i2) {
                                    i5++;
                                } else if (makeHash < i) {
                                    i4++;
                                } else {
                                    i3++;
                                }
                                createProducer.newMessage().key(str).value(Integer.valueOf(i6)).send();
                            }
                        }
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(new KeyValue<>(createConsumer, Integer.valueOf(i3)));
                        arrayList.add(new KeyValue<>(createConsumer, Integer.valueOf(i4)));
                        arrayList.add(new KeyValue<>(createConsumer2, Integer.valueOf(i5)));
                        receiveAndCheck(arrayList);
                        Thread.sleep(1000L);
                        createConsumer.close();
                        createConsumer.close();
                        for (int i7 = 0; i7 < 10; i7++) {
                            Iterator<String> it = keys.iterator();
                            while (it.hasNext()) {
                                createProducer.newMessage().key(it.next()).value(Integer.valueOf(i7)).send();
                            }
                        }
                        ArrayList arrayList2 = new ArrayList();
                        arrayList2.add(new KeyValue<>(createConsumer2, 100));
                        receiveAndCheck(arrayList2);
                        if (Collections.singletonList(createProducer).get(0) != null) {
                            createProducer.close();
                        }
                        if (Collections.singletonList(createConsumer2).get(0) != null) {
                            createConsumer2.close();
                        }
                        if (Collections.singletonList(createConsumer).get(0) != null) {
                            createConsumer.close();
                        }
                    } catch (Throwable th) {
                        if (Collections.singletonList(createProducer).get(0) != null) {
                            createProducer.close();
                        }
                        throw th;
                    }
                } finally {
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                }
            } finally {
                if (Collections.singletonList(createConsumer).get(0) != null) {
                    createConsumer.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(createConsumer).get(0) != null) {
                createConsumer.close();
            }
            throw th2;
        }
    }

    @Test
    public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
        this.conf.setSubscriptionKeySharedEnable(true);
        Consumer<Integer> createConsumer = createConsumer("non-persistent://public/default/key_shared_none_key");
        try {
            createConsumer = createConsumer("non-persistent://public/default/key_shared_none_key");
            try {
                Consumer<Integer> createConsumer2 = createConsumer("non-persistent://public/default/key_shared_none_key");
                try {
                    Producer<Integer> createProducer = createProducer("non-persistent://public/default/key_shared_none_key");
                    try {
                        int i = 65536 >> 1;
                        int i2 = i >> 1;
                        for (int i3 = 0; i3 < 100; i3++) {
                            createProducer.newMessage().value(Integer.valueOf(i3)).send();
                        }
                        int makeHash = Murmur3_32Hash.getInstance().makeHash("NONE_KEY".getBytes()) % 65536;
                        ArrayList arrayList = new ArrayList();
                        if (makeHash < i2) {
                            arrayList.add(new KeyValue<>(createConsumer2, 100));
                        } else if (makeHash < i) {
                            arrayList.add(new KeyValue<>(createConsumer, 100));
                        } else {
                            arrayList.add(new KeyValue<>(createConsumer, 100));
                        }
                        receiveAndCheck(arrayList);
                        if (Collections.singletonList(createProducer).get(0) != null) {
                            createProducer.close();
                        }
                        if (Collections.singletonList(createConsumer2).get(0) != null) {
                            createConsumer2.close();
                        }
                        if (Collections.singletonList(createConsumer).get(0) != null) {
                            createConsumer.close();
                        }
                    } catch (Throwable th) {
                        if (Collections.singletonList(createProducer).get(0) != null) {
                            createProducer.close();
                        }
                        throw th;
                    }
                } finally {
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                }
            } finally {
                if (Collections.singletonList(createConsumer).get(0) != null) {
                    createConsumer.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(createConsumer).get(0) != null) {
                createConsumer.close();
            }
            throw th2;
        }
    }

    @Test
    public void testOrderingKeyWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
        this.conf.setSubscriptionKeySharedEnable(true);
        Consumer<Integer> createConsumer = createConsumer("non-persistent://public/default/key_shared_ordering_key");
        try {
            createConsumer = createConsumer("non-persistent://public/default/key_shared_ordering_key");
            try {
                Consumer<Integer> createConsumer2 = createConsumer("non-persistent://public/default/key_shared_ordering_key");
                try {
                    Producer<Integer> createProducer = createProducer("non-persistent://public/default/key_shared_ordering_key");
                    try {
                        int i = 65536 >> 1;
                        int i2 = i >> 1;
                        int i3 = 0;
                        int i4 = 0;
                        int i5 = 0;
                        for (int i6 = 0; i6 < 10; i6++) {
                            for (String str : keys) {
                                int makeHash = Murmur3_32Hash.getInstance().makeHash(str.getBytes()) % 65536;
                                if (makeHash < i2) {
                                    i5++;
                                } else if (makeHash < i) {
                                    i4++;
                                } else {
                                    i3++;
                                }
                                createProducer.newMessage().key("any key").orderingKey(str.getBytes()).value(Integer.valueOf(i6)).send();
                            }
                        }
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(new KeyValue<>(createConsumer, Integer.valueOf(i3)));
                        arrayList.add(new KeyValue<>(createConsumer, Integer.valueOf(i4)));
                        arrayList.add(new KeyValue<>(createConsumer2, Integer.valueOf(i5)));
                        receiveAndCheck(arrayList);
                        if (Collections.singletonList(createProducer).get(0) != null) {
                            createProducer.close();
                        }
                        if (Collections.singletonList(createConsumer2).get(0) != null) {
                            createConsumer2.close();
                        }
                        if (Collections.singletonList(createConsumer).get(0) != null) {
                            createConsumer.close();
                        }
                    } catch (Throwable th) {
                        if (Collections.singletonList(createProducer).get(0) != null) {
                            createProducer.close();
                        }
                        throw th;
                    }
                } finally {
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                }
            } finally {
                if (Collections.singletonList(createConsumer).get(0) != null) {
                    createConsumer.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(createConsumer).get(0) != null) {
                createConsumer.close();
            }
            throw th2;
        }
    }

    @Test(expectedExceptions = {PulsarClientException.class})
    public void testDisableKeySharedSubscription() throws PulsarClientException {
        this.conf.setSubscriptionKeySharedEnable(false);
        this.pulsarClient.newConsumer().topic("persistent://public/default/key_shared_disabled").subscriptionName("key_shared").subscriptionType(SubscriptionType.Key_Shared).ackTimeout(10L, TimeUnit.SECONDS).subscribe();
    }

    private Producer<Integer> createProducer(String str) throws PulsarClientException {
        return this.pulsarClient.newProducer(Schema.INT32).topic(str).enableBatching(false).create();
    }

    private Consumer<Integer> createConsumer(String str) throws PulsarClientException {
        return this.pulsarClient.newConsumer(Schema.INT32).topic(str).subscriptionName("key_shared").subscriptionType(SubscriptionType.Key_Shared).ackTimeout(3L, TimeUnit.SECONDS).subscribe();
    }

    private void receiveAndCheck(List<KeyValue<Consumer<Integer>, Integer>> list) throws PulsarClientException {
        HashMap hashMap = new HashMap();
        for (KeyValue<Consumer<Integer>, Integer> keyValue : list) {
            int i = 0;
            HashMap hashMap2 = new HashMap();
            for (Integer num = 0; num.intValue() < keyValue.getValue().intValue(); num = Integer.valueOf(num.intValue() + 1)) {
                Message<Integer> receive = keyValue.getKey().receive();
                keyValue.getKey().acknowledge(receive);
                String str = receive.hasOrderingKey() ? new String(receive.getOrderingKey()) : receive.getKey();
                log.info("[{}] Receive message key: {} value: {} messageId: {}", keyValue.getKey().getConsumerName(), str, receive.getValue(), receive.getMessageId());
                if (hashMap2.get(str) == null) {
                    Assert.assertNotNull(receive);
                } else {
                    Assert.assertTrue(receive.getValue().compareTo((Integer) ((Message) hashMap2.get(str)).getValue()) > 0);
                }
                hashMap2.put(str, receive);
                hashMap.putIfAbsent(keyValue.getKey(), Sets.newHashSet());
                ((Set) hashMap.get(keyValue.getKey())).add(str);
                i++;
            }
            Assert.assertEquals(keyValue.getValue().intValue(), i);
        }
        HashSet newHashSet = Sets.newHashSet();
        hashMap.forEach((consumer, set) -> {
            set.forEach(str2 -> {
                Assert.assertTrue(newHashSet.add(str2), "Key " + str2 + "is distributed to multiple consumers.");
            });
        });
    }
}
