package org.apache.pulsar.compaction;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.EventLoopGroup;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker-compaction"})
/* loaded from: input_file:org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.class */
public class ServiceUnitStateCompactionTest extends MockedPulsarServiceBaseTest {
    private ScheduledExecutorService compactionScheduler;
    private BookKeeper bk;
    private Schema<ServiceUnitStateData> schema;
    private ServiceUnitStateCompactionStrategy strategy;
    private ServiceUnitState testState = ServiceUnitState.Init;
    private ServiceUnitStateData testData = null;
    private static Random RANDOM = new Random();

    /* loaded from: input_file:org/apache/pulsar/compaction/ServiceUnitStateCompactionTest$TestData.class */
    public static final class TestData extends Record {
        private final String topic;
        private final Map<String, ServiceUnitStateData> expected;
        private final List<Pair<String, ServiceUnitStateData>> all;

        public TestData(String str, Map<String, ServiceUnitStateData> map, List<Pair<String, ServiceUnitStateData>> list) {
            this.topic = str;
            this.expected = map;
            this.all = list;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, TestData.class), TestData.class, "topic;expected;all", "FIELD:Lorg/apache/pulsar/compaction/ServiceUnitStateCompactionTest$TestData;->topic:Ljava/lang/String;", "FIELD:Lorg/apache/pulsar/compaction/ServiceUnitStateCompactionTest$TestData;->expected:Ljava/util/Map;", "FIELD:Lorg/apache/pulsar/compaction/ServiceUnitStateCompactionTest$TestData;->all:Ljava/util/List;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, TestData.class), TestData.class, "topic;expected;all", "FIELD:Lorg/apache/pulsar/compaction/ServiceUnitStateCompactionTest$TestData;->topic:Ljava/lang/String;", "FIELD:Lorg/apache/pulsar/compaction/ServiceUnitStateCompactionTest$TestData;->expected:Ljava/util/Map;", "FIELD:Lorg/apache/pulsar/compaction/ServiceUnitStateCompactionTest$TestData;->all:Ljava/util/List;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, TestData.class, Object.class), TestData.class, "topic;expected;all", "FIELD:Lorg/apache/pulsar/compaction/ServiceUnitStateCompactionTest$TestData;->topic:Ljava/lang/String;", "FIELD:Lorg/apache/pulsar/compaction/ServiceUnitStateCompactionTest$TestData;->expected:Ljava/util/Map;", "FIELD:Lorg/apache/pulsar/compaction/ServiceUnitStateCompactionTest$TestData;->all:Ljava/util/List;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public String topic() {
            return this.topic;
        }

        public Map<String, ServiceUnitStateData> expected() {
            return this.expected;
        }

        public List<Pair<String, ServiceUnitStateData>> all() {
            return this.all;
        }
    }

    private ServiceUnitStateData testValue(ServiceUnitState serviceUnitState, String str) {
        if (serviceUnitState == ServiceUnitState.Init) {
            this.testData = null;
        } else {
            this.testData = new ServiceUnitStateData(serviceUnitState, str, versionId(this.testData) + 1);
        }
        return this.testData;
    }

    private ServiceUnitStateData testValue(String str) {
        this.testState = nextValidStateNonSplit(this.testState);
        return testValue(this.testState, str);
    }

    private ServiceUnitState nextValidState(ServiceUnitState serviceUnitState) {
        List list = (List) Arrays.stream(ServiceUnitState.values()).filter(serviceUnitState2 -> {
            return ServiceUnitState.isValidTransition(serviceUnitState, serviceUnitState2);
        }).collect(Collectors.toList());
        return (ServiceUnitState) list.get(RANDOM.nextInt(list.size()));
    }

    private ServiceUnitState nextValidStateNonSplit(ServiceUnitState serviceUnitState) {
        List list = (List) Arrays.stream(ServiceUnitState.values()).filter(serviceUnitState2 -> {
            return (serviceUnitState2 == ServiceUnitState.Init || serviceUnitState2 == ServiceUnitState.Splitting || serviceUnitState2 == ServiceUnitState.Deleted || !ServiceUnitState.isValidTransition(serviceUnitState, serviceUnitState2)) ? false : true;
        }).collect(Collectors.toList());
        return (ServiceUnitState) list.get(RANDOM.nextInt(list.size()));
    }

    private ServiceUnitState nextInvalidState(ServiceUnitState serviceUnitState) {
        List list = (List) Arrays.stream(ServiceUnitState.values()).filter(serviceUnitState2 -> {
            return !ServiceUnitState.isValidTransition(serviceUnitState, serviceUnitState2);
        }).collect(Collectors.toList());
        return list.size() == 0 ? ServiceUnitState.Init : (ServiceUnitState) list.get(RANDOM.nextInt(list.size()));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-property", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2"}), Sets.newHashSet(new String[]{"use"})));
        this.admin.namespaces().createNamespace("my-property/use/my-ns");
        this.compactionScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
        this.bk = this.pulsar.getBookKeeperClientFactory().create(this.conf, (MetadataStoreExtended) null, (EventLoopGroup) null, Optional.empty(), (Map) null);
        this.schema = Schema.JSON(ServiceUnitStateData.class);
        this.strategy = new ServiceUnitStateCompactionStrategy();
        this.strategy.checkBrokers(false);
        this.testState = ServiceUnitState.Init;
        this.testData = null;
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
        this.bk.close();
        if (this.compactionScheduler != null) {
            this.compactionScheduler.shutdownNow();
        }
    }

    TestData generateTestData() throws PulsarAdminException, PulsarClientException {
        this.admin.namespaces().setRetention("my-property/use/my-ns", new RetentionPolicies(-1, -1L));
        Producer create = this.pulsarClient.newProducer(this.schema).topic("persistent://my-property/use/my-ns/my-topic1").compressionType(ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE).enableBatching(true).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        Random random = new Random(0L);
        this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        for (int i = 0; i < 20; i++) {
            String str = "key" + random.nextInt(5);
            ServiceUnitStateData serviceUnitStateData = (ServiceUnitStateData) hashMap.get(str);
            ServiceUnitState state = ServiceUnitStateData.state(serviceUnitStateData);
            boolean nextBoolean = random.nextBoolean();
            ServiceUnitState nextInvalidState = nextBoolean ? nextInvalidState(state) : nextValidState(state);
            long versionId = versionId(serviceUnitStateData) + 1;
            ServiceUnitStateData serviceUnitStateData2 = nextBoolean ? new ServiceUnitStateData(nextInvalidState, str + ":" + i, false, versionId) : nextInvalidState == ServiceUnitState.Init ? new ServiceUnitStateData(nextInvalidState, str + ":" + i, true, versionId) : new ServiceUnitStateData(nextInvalidState, str + ":" + i, false, versionId);
            create.newMessage().key(str).value(serviceUnitStateData2).send();
            if (!this.strategy.shouldKeepLeft(serviceUnitStateData, serviceUnitStateData2)) {
                hashMap.put(str, serviceUnitStateData2);
            }
            arrayList.add(Pair.of(str, serviceUnitStateData2));
        }
        return new TestData("persistent://my-property/use/my-ns/my-topic1", hashMap, arrayList);
    }

    @Test
    public void testCompaction() throws Exception {
        TestData generateTestData = generateTestData();
        String str = generateTestData.topic;
        Map<String, ServiceUnitStateData> map = generateTestData.expected;
        List<Pair<String, ServiceUnitStateData>> list = generateTestData.all;
        new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact(str, this.strategy).get();
        PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(str, false);
        Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
        Assert.assertFalse(internalStats.compactedLedger.offloaded);
        Consumer subscribe = this.pulsarClient.newConsumer(this.schema).topic(new String[]{str}).subscriptionName("sub1").readCompacted(true).subscribe();
        do {
            try {
                Message receive = subscribe.receive(2, TimeUnit.SECONDS);
                Assert.assertEquals(map.remove(receive.getKey()), receive.getValue());
            } finally {
            }
        } while (!map.isEmpty());
        Assert.assertTrue(map.isEmpty());
        if (subscribe != null) {
            subscribe.close();
        }
        subscribe = this.pulsarClient.newConsumer(this.schema).topic(new String[]{str}).subscriptionName("sub1").readCompacted(false).subscribe();
        do {
            try {
                Message receive2 = subscribe.receive(2, TimeUnit.SECONDS);
                Pair<String, ServiceUnitStateData> remove = list.remove(0);
                Assert.assertEquals((String) remove.getLeft(), receive2.getKey());
                Assert.assertEquals(remove.getRight(), receive2.getValue());
            } finally {
            }
        } while (!list.isEmpty());
        Assert.assertTrue(list.isEmpty());
        if (subscribe != null) {
            subscribe.close();
        }
    }

    @Test
    public void testCompactionWithReader() throws Exception {
        TestData generateTestData = generateTestData();
        String str = generateTestData.topic;
        Map<String, ServiceUnitStateData> map = generateTestData.expected;
        List<Pair<String, ServiceUnitStateData>> list = generateTestData.all;
        new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact(str, this.strategy).get();
        Reader create = this.pulsarClient.newReader(this.schema).topic(str).readCompacted(true).startMessageId(MessageId.earliest).create();
        do {
            try {
                Message readNext = create.readNext(2, TimeUnit.SECONDS);
                Assert.assertEquals(map.remove(readNext.getKey()), readNext.getValue());
            } finally {
            }
        } while (!map.isEmpty());
        Assert.assertTrue(map.isEmpty());
        if (create != null) {
            create.close();
        }
        create = this.pulsarClient.newReader(this.schema).topic(str).readCompacted(false).startMessageId(MessageId.earliest).create();
        do {
            try {
                Message readNext2 = create.readNext(2, TimeUnit.SECONDS);
                Pair<String, ServiceUnitStateData> remove = list.remove(0);
                Assert.assertEquals((String) remove.getLeft(), readNext2.getKey());
                Assert.assertEquals(remove.getRight(), readNext2.getValue());
            } finally {
            }
        } while (!list.isEmpty());
        Assert.assertTrue(list.isEmpty());
        if (create != null) {
            create.close();
        }
    }

    @Test
    public void testCompactionWithTableview() throws Exception {
        TableView create = this.pulsar.getClient().newTableViewBuilder(this.schema).topic("persistent://my-property/use/my-ns/my-topic1").loadConf(Map.of("topicCompactionStrategyClassName", ServiceUnitStateCompactionStrategy.class.getName())).create();
        ((ServiceUnitStateCompactionStrategy) FieldUtils.readDeclaredField(create, "compactionStrategy", true)).checkBrokers(false);
        TestData generateTestData = generateTestData();
        String str = generateTestData.topic;
        Map<String, ServiceUnitStateData> map = generateTestData.expected;
        HashMap hashMap = new HashMap(map);
        Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertEquals(hashMap.size(), create.size());
        });
        for (Map.Entry entry : create.entrySet()) {
            Assert.assertEquals(hashMap.remove(entry.getKey()), entry.getValue());
            if (hashMap.isEmpty()) {
                break;
            }
        }
        Assert.assertTrue(hashMap.isEmpty());
        create.close();
        new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact(str, this.strategy).get();
        TableView create2 = this.pulsar.getClient().newTableView(this.schema).topic(str).loadConf(Map.of("topicCompactionStrategyClassName", ServiceUnitStateCompactionStrategy.class.getName())).create();
        for (Map.Entry entry2 : create2.entrySet()) {
            Assert.assertEquals(map.remove(entry2.getKey()), entry2.getValue());
            if (map.isEmpty()) {
                break;
            }
        }
        Assert.assertTrue(map.isEmpty());
        create2.close();
    }

    @Test
    public void testReadCompactedBeforeCompaction() throws Exception {
        Producer create = this.pulsarClient.newProducer(this.schema).topic("persistent://my-property/use/my-ns/my-topic1").compressionType(ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE).enableBatching(true).create();
        this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        List asList = Arrays.asList(testValue("content0"), testValue("content1"), testValue("content2"));
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            create.newMessage().key("key0").value((ServiceUnitStateData) it.next()).send();
        }
        Consumer subscribe = this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe();
        try {
            Message receive = subscribe.receive();
            Assert.assertEquals(receive.getKey(), "key0");
            Assert.assertEquals(receive.getValue(), asList.get(0));
            Message receive2 = subscribe.receive();
            Assert.assertEquals(receive2.getKey(), "key0");
            Assert.assertEquals(receive2.getValue(), asList.get(1));
            Message receive3 = subscribe.receive();
            Assert.assertEquals(receive3.getKey(), "key0");
            Assert.assertEquals(receive3.getValue(), asList.get(2));
            if (subscribe != null) {
                subscribe.close();
            }
            new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1", this.strategy).get();
            subscribe = this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe();
            try {
                Message receive4 = subscribe.receive();
                Assert.assertEquals(receive4.getKey(), "key0");
                Assert.assertEquals(receive4.getValue(), asList.get(2));
                if (subscribe != null) {
                    subscribe.close();
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testReadEntriesAfterCompaction() throws Exception {
        Producer create = this.pulsarClient.newProducer(this.schema).topic("persistent://my-property/use/my-ns/my-topic1").compressionType(ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE).enableBatching(true).create();
        this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        List asList = Arrays.asList(testValue("content0"), testValue("content1"), testValue("content2"), testValue("content3"));
        create.newMessage().key("key0").value((ServiceUnitStateData) asList.get(0)).send();
        create.newMessage().key("key0").value((ServiceUnitStateData) asList.get(1)).send();
        create.newMessage().key("key0").value((ServiceUnitStateData) asList.get(2)).send();
        new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1", this.strategy).get();
        create.newMessage().key("key0").value((ServiceUnitStateData) asList.get(3)).send();
        Consumer subscribe = this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe();
        try {
            Message receive = subscribe.receive();
            Assert.assertEquals(receive.getKey(), "key0");
            Assert.assertEquals(receive.getValue(), asList.get(2));
            Message receive2 = subscribe.receive();
            Assert.assertEquals(receive2.getKey(), "key0");
            Assert.assertEquals(receive2.getValue(), asList.get(3));
            if (subscribe != null) {
                subscribe.close();
            }
        } catch (Throwable th) {
            if (subscribe != null) {
                try {
                    subscribe.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testSeekEarliestAfterCompaction() throws Exception {
        Producer create = this.pulsarClient.newProducer(this.schema).topic("persistent://my-property/use/my-ns/my-topic1").compressionType(ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE).enableBatching(true).create();
        List asList = Arrays.asList(testValue("content0"), testValue("content1"), testValue("content2"));
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            create.newMessage().key("key0").value((ServiceUnitStateData) it.next()).send();
        }
        new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1", this.strategy).get();
        Consumer subscribe = this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe();
        try {
            subscribe.seek(MessageId.earliest);
            Message receive = subscribe.receive();
            Assert.assertEquals(receive.getKey(), "key0");
            Assert.assertEquals(receive.getValue(), asList.get(2));
            if (subscribe != null) {
                subscribe.close();
            }
            subscribe = this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(false).subscribe();
            try {
                subscribe.seek(MessageId.earliest);
                Message receive2 = subscribe.receive();
                Assert.assertEquals(receive2.getKey(), "key0");
                Assert.assertEquals(receive2.getValue(), asList.get(0));
                Message receive3 = subscribe.receive();
                Assert.assertEquals(receive3.getKey(), "key0");
                Assert.assertEquals(receive3.getValue(), asList.get(1));
                Message receive4 = subscribe.receive();
                Assert.assertEquals(receive4.getKey(), "key0");
                Assert.assertEquals(receive4.getValue(), asList.get(2));
                if (subscribe != null) {
                    subscribe.close();
                }
            } finally {
            }
        } finally {
        }
    }

    /* JADX WARN: Type inference failed for: r5v4, types: [java.lang.String, long] */
    @Test
    public void testSlowTableviewAfterCompaction() throws Exception {
        this.strategy.checkBrokers(true);
        this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        TableView create = this.pulsar.getClient().newTableViewBuilder(this.schema).topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("fastTV").loadConf(Map.of("topicCompactionStrategyClassName", ServiceUnitStateCompactionStrategy.class.getName())).create();
        PulsarTestContext createAdditionalPulsarTestContext = createAdditionalPulsarTestContext(getDefaultConf());
        try {
            PulsarService pulsarService = createAdditionalPulsarTestContext.getPulsarService();
            TableView create2 = pulsarService.getClient().newTableViewBuilder(this.schema).topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("slowTV").loadConf(Map.of("topicCompactionStrategyClassName", ServiceUnitStateCompactionStrategy.class.getName())).create();
            Semaphore semaphore = new Semaphore(0);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            create2.listen((str, serviceUnitStateData) -> {
                if (serviceUnitStateData.state() != ServiceUnitState.Assigning) {
                    if (serviceUnitStateData.state() == ServiceUnitState.Releasing) {
                        atomicBoolean.set(true);
                    }
                } else {
                    try {
                        atomicBoolean.set(false);
                        semaphore.acquire();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            this.admin.namespaces().setRetention("my-property/use/my-ns", new RetentionPolicies(-1, -1L));
            Producer create3 = this.pulsarClient.newProducer(this.schema).topic("persistent://my-property/use/my-ns/my-topic1").compressionType(ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE).enableBatching(true).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            StrategicTwoPhaseCompactor strategicTwoPhaseCompactor = new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
            String str2 = "bundle1";
            String str3 = "broker0";
            String str4 = "broker1";
            TypedMessageBuilder key = create3.newMessage().key("bundle1");
            ServiceUnitState serviceUnitState = ServiceUnitState.Owned;
            long j = 1 + 1;
            key.value(new ServiceUnitStateData(str3, str3, 1L)).send();
            for (int i = 0; i < 3; i++) {
                ?? r5 = j;
                long j2 = r5 + 1;
                ServiceUnitStateData serviceUnitStateData2 = new ServiceUnitStateData(ServiceUnitState.Releasing, str4, (String) r5, (long) r5);
                create3.newMessage().key("bundle1").value(serviceUnitStateData2).send();
                create3.newMessage().key("bundle1").value(serviceUnitStateData2).send();
                String str5 = str3;
                long j3 = j2 + 1;
                ServiceUnitStateData serviceUnitStateData3 = new ServiceUnitStateData(ServiceUnitState.Assigning, str5, str5, j2);
                create3.newMessage().key("bundle1").value(serviceUnitStateData3).send();
                create3.newMessage().key("bundle1").value(serviceUnitStateData3).send();
                String str6 = str3;
                j = j3 + 1;
                ServiceUnitStateData serviceUnitStateData4 = new ServiceUnitStateData(ServiceUnitState.Owned, str6, str6, j3);
                create3.newMessage().key("bundle1").value(serviceUnitStateData4).send();
                create3.newMessage().key("bundle1").value(serviceUnitStateData4).send();
                strategicTwoPhaseCompactor.compact("persistent://my-property/use/my-ns/my-topic1", this.strategy).get();
                Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
                    Assert.assertEquals(create.get(str2), serviceUnitStateData4);
                });
                Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
                    Assert.assertEquals(create2.get(str2), serviceUnitStateData3);
                });
                Assert.assertTrue(!atomicBoolean.get());
                semaphore.release();
                Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
                    Assert.assertEquals(create2.get(str2), serviceUnitStateData4);
                });
                TableView create4 = this.pulsar.getClient().newTableView(this.schema).topic("persistent://my-property/use/my-ns/my-topic1").loadConf(Map.of("topicCompactionStrategyClassName", ServiceUnitStateCompactionStrategy.class.getName())).create();
                Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
                    Assert.assertEquals(create4.get(str2), serviceUnitStateData4);
                });
                str3 = str4;
                str4 = "broker" + (i + 2);
                create4.close();
            }
            create3.close();
            create2.close();
            create.close();
            pulsarService.close();
            if (Collections.singletonList(createAdditionalPulsarTestContext).get(0) != null) {
                createAdditionalPulsarTestContext.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(createAdditionalPulsarTestContext).get(0) != null) {
                createAdditionalPulsarTestContext.close();
            }
            throw th;
        }
    }

    @Test
    public void testSlowReceiveTableviewAfterCompaction() throws Exception {
        String str = "persistent://my-property/use/my-ns/my-topic1";
        this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        TableView create = this.pulsar.getClient().newTableViewBuilder(this.schema).topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("slowTV").loadConf(Map.of("topicCompactionStrategyClassName", ServiceUnitStateCompactionStrategy.class.getName())).create();
        this.admin.namespaces().setRetention("my-property/use/my-ns", new RetentionPolicies(-1, -1L));
        Producer create2 = this.pulsarClient.newProducer(this.schema).topic("persistent://my-property/use/my-ns/my-topic1").compressionType(ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE).enableBatching(true).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        StrategicTwoPhaseCompactor strategicTwoPhaseCompactor = new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        ReaderImpl readerImpl = (ReaderImpl) ((CompletableFuture) FieldUtils.readDeclaredField(create, "reader", true)).get();
        ConsumerImpl consumerImpl = (ConsumerImpl) Mockito.spy(readerImpl.getConsumer());
        FieldUtils.writeDeclaredField(readerImpl, "consumer", consumerImpl, true);
        String str2 = "bundle1";
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(1);
        int i = 1000;
        ((ConsumerImpl) Mockito.doAnswer(invocationOnMock -> {
            if (atomicInteger2.decrementAndGet() != 0) {
                Mockito.reset(new ConsumerImpl[]{consumerImpl});
                return consumerImpl.receiveAsync();
            }
            CompletableFuture receiveAsync = consumerImpl.receiveAsync();
            for (int i2 = 0; i2 < i; i2++) {
                create2.newMessage().key(str2).value(new ServiceUnitStateData(ServiceUnitState.Owned, "broker" + atomicInteger.incrementAndGet(), true, atomicInteger.get())).send();
            }
            strategicTwoPhaseCompactor.compact(str, this.strategy).join();
            return receiveAsync;
        }).when(consumerImpl)).receiveAsync();
        create2.newMessage().key("bundle1").value(new ServiceUnitStateData(ServiceUnitState.Owned, "broker", true, atomicInteger.incrementAndGet())).send();
        create2.newMessage().key("bundle1").value(new ServiceUnitStateData(ServiceUnitState.Owned, "broker" + atomicInteger.incrementAndGet(), true, atomicInteger.get())).send();
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            ServiceUnitStateData serviceUnitStateData = (ServiceUnitStateData) create.get(str2);
            Assert.assertNotNull(serviceUnitStateData);
            Assert.assertEquals(serviceUnitStateData.dstBroker(), "broker" + atomicInteger.get());
        });
        create2.close();
        create.close();
    }

    @Test
    public void testBrokerRestartAfterCompaction() throws Exception {
        Producer create = this.pulsarClient.newProducer(this.schema).topic("persistent://my-property/use/my-ns/my-topic1").compressionType(ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE).enableBatching(true).create();
        this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        List asList = Arrays.asList(testValue("content0"), testValue("content1"), testValue("content2"));
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            create.newMessage().key("key0").value((ServiceUnitStateData) it.next()).send();
        }
        new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1", this.strategy).get();
        Consumer subscribe = this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe();
        try {
            Message receive = subscribe.receive();
            Assert.assertEquals(receive.getKey(), "key0");
            Assert.assertEquals(receive.getValue(), asList.get(asList.size() - 1));
            if (subscribe != null) {
                subscribe.close();
            }
            stopBroker();
            try {
                Consumer subscribe2 = this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe();
                try {
                    subscribe2.receive();
                    Assert.fail("Shouldn't have been able to receive anything");
                    if (subscribe2 != null) {
                        subscribe2.close();
                    }
                } finally {
                }
            } catch (PulsarClientException e) {
            }
            startBroker();
            Consumer subscribe3 = this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe();
            try {
                Message receive2 = subscribe3.receive();
                Assert.assertEquals(receive2.getKey(), "key0");
                Assert.assertEquals(receive2.getValue(), asList.get(asList.size() - 1));
                if (subscribe3 != null) {
                    subscribe3.close();
                }
            } catch (Throwable th) {
                if (subscribe3 != null) {
                    try {
                        subscribe3.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (subscribe != null) {
                try {
                    subscribe.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCompactEmptyTopic() throws Exception {
        Producer create = this.pulsarClient.newProducer(this.schema).topic("persistent://my-property/use/my-ns/my-topic1").compressionType(ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE).enableBatching(true).create();
        this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1", this.strategy).get();
        ServiceUnitStateData testValue = testValue("content0");
        create.newMessage().key("key0").value(testValue).send();
        Consumer subscribe = this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe();
        try {
            Message receive = subscribe.receive();
            Assert.assertEquals(receive.getKey(), "key0");
            Assert.assertEquals(receive.getValue(), testValue);
            if (subscribe != null) {
                subscribe.close();
            }
        } catch (Throwable th) {
            if (subscribe != null) {
                try {
                    subscribe.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testWholeBatchCompactedOut() throws Exception {
        this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        Producer create = this.pulsarClient.newProducer(this.schema).topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(true).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            Producer create2 = this.pulsarClient.newProducer(this.schema).topic("persistent://my-property/use/my-ns/my-topic1").maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            try {
                create2.newMessage().key("key1").value(testValue("my-message-1")).sendAsync();
                create2.newMessage().key("key1").value(testValue("my-message-2")).sendAsync();
                create2.newMessage().key("key1").value(testValue("my-message-3")).sendAsync();
                create.newMessage().key("key1").value(testValue("my-message-4")).send();
                if (create2 != null) {
                    create2.close();
                }
                if (create != null) {
                    create.close();
                }
                new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1", this.strategy).get();
                Consumer subscribe = this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe();
                try {
                    Message receive = subscribe.receive();
                    Assert.assertEquals(receive.getKey(), "key1");
                    Assert.assertEquals(new String(((ServiceUnitStateData) receive.getValue()).dstBroker()), "my-message-4");
                    if (subscribe != null) {
                        subscribe.close();
                    }
                } catch (Throwable th) {
                    if (subscribe != null) {
                        try {
                            subscribe.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (create2 != null) {
                    try {
                        create2.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    public void testCompactionWithLastDeletedKey() throws Exception {
        Producer create = this.pulsarClient.newProducer(this.schema).topic("persistent://my-property/use/my-ns/my-topic1").compressionType(ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE).enableBatching(true).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        create.newMessage().key("1").value(testValue("1")).send();
        create.newMessage().key("2").value(testValue("3")).send();
        create.newMessage().key("3").value(testValue("5")).send();
        create.newMessage().key("1").value((Object) null).send();
        create.newMessage().key("2").value((Object) null).send();
        new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1", this.strategy).get();
        HashSet newHashSet = Sets.newHashSet(new String[]{"3"});
        Consumer subscribe = this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe();
        try {
            Assert.assertTrue(newHashSet.remove(subscribe.receive(2, TimeUnit.SECONDS).getKey()));
            if (subscribe != null) {
                subscribe.close();
            }
        } catch (Throwable th) {
            if (subscribe != null) {
                try {
                    subscribe.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 20000)
    public void testEmptyCompactionLedger() throws Exception {
        Producer create = this.pulsarClient.newProducer(this.schema).topic("persistent://my-property/use/my-ns/my-topic1").compressionType(ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE).enableBatching(true).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        create.newMessage().key("1").value(testValue(ServiceUnitState.Owned, "1")).send();
        create.newMessage().key("2").value(testValue(ServiceUnitState.Owned, "3")).send();
        create.newMessage().key("1").value((Object) null).send();
        create.newMessage().key("2").value((Object) null).send();
        new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1", this.strategy).get();
        Consumer subscribe = this.pulsarClient.newConsumer(this.schema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").readCompacted(true).subscribe();
        try {
            Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
            if (subscribe != null) {
                subscribe.close();
            }
        } catch (Throwable th) {
            if (subscribe != null) {
                try {
                    subscribe.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 20000)
    public void testAllEmptyCompactionLedger() throws Exception {
        String str = "persistent://my-property/use/my-ns/testAllEmptyCompactionLedger" + UUID.randomUUID().toString();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(this.schema).compressionType(ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE).topic(str);
        producerBuilder.batchingMaxMessages(2);
        Producer create = producerBuilder.create();
        ArrayList arrayList = new ArrayList(10);
        for (int i = 0; i < 10; i++) {
            arrayList.add(create.newMessage().key("1").value((Object) null).sendAsync());
        }
        FutureUtil.waitForAll(arrayList).get();
        new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler).compact(str, this.strategy).get();
        Consumer subscribe = this.pulsarClient.newConsumer(this.schema).topic(new String[]{str}).subscriptionName("sub1").readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        try {
            Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
            if (subscribe != null) {
                subscribe.close();
            }
        } catch (Throwable th) {
            if (subscribe != null) {
                try {
                    subscribe.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 20000)
    public void testCompactMultipleTimesWithoutEmptyMessage() throws PulsarClientException, ExecutionException, InterruptedException {
        String str = "persistent://my-property/use/my-ns/testCompactMultipleTimesWithoutEmptyMessage" + UUID.randomUUID().toString();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(this.schema).topic(str);
        producerBuilder.compressionType(ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE);
        producerBuilder.enableBatching(true);
        Producer create = producerBuilder.create();
        ArrayList arrayList = new ArrayList(10);
        for (int i = 0; i < 10; i++) {
            arrayList.add(create.newMessage().key("1").value(testValue(i)).sendAsync());
        }
        FutureUtil.waitForAll(arrayList).get();
        StrategicTwoPhaseCompactor strategicTwoPhaseCompactor = new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        strategicTwoPhaseCompactor.compact(str, this.strategy).get();
        arrayList.clear();
        for (int i2 = 0; i2 < 10; i2++) {
            arrayList.add(create.newMessage().key("1").value(testValue((i2 + 10))).sendAsync());
        }
        FutureUtil.waitForAll(arrayList).get();
        strategicTwoPhaseCompactor.compact(str, this.strategy).get();
        Consumer subscribe = this.pulsarClient.newConsumer(this.schema).topic(new String[]{str}).subscriptionName("sub1").readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        try {
            Message receive = subscribe.receive();
            Assert.assertNotNull(receive);
            Assert.assertEquals(receive.getKey(), "1");
            Assert.assertEquals(((ServiceUnitStateData) receive.getValue()).dstBroker(), "19");
            Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
            if (subscribe != null) {
                subscribe.close();
            }
        } catch (Throwable th) {
            if (subscribe != null) {
                try {
                    subscribe.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 200000)
    public void testReadUnCompacted() throws PulsarClientException, ExecutionException, InterruptedException {
        String str = "persistent://my-property/use/my-ns/testReadUnCompacted" + UUID.randomUUID().toString();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(this.schema).topic(str);
        producerBuilder.compressionType(ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE);
        producerBuilder.batchingMaxMessages(2);
        Producer create = producerBuilder.create();
        ArrayList arrayList = new ArrayList(10);
        for (int i = 0; i < 10; i++) {
            arrayList.add(create.newMessage().key("1").value(testValue(i)).sendAsync());
        }
        FutureUtil.waitForAll(arrayList).get();
        StrategicTwoPhaseCompactor strategicTwoPhaseCompactor = new StrategicTwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        strategicTwoPhaseCompactor.compact(str, this.strategy).get();
        arrayList.clear();
        for (int i2 = 0; i2 < 10; i2++) {
            arrayList.add(create.newMessage().key("1").value(testValue((i2 + 10))).sendAsync());
        }
        FutureUtil.waitForAll(arrayList).get();
        Consumer subscribe = this.pulsarClient.newConsumer(this.schema).topic(new String[]{str}).subscriptionName("sub1").readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        for (int i3 = 0; i3 < 11; i3++) {
            try {
                Message receive = subscribe.receive();
                Assert.assertNotNull(receive);
                Assert.assertEquals(receive.getKey(), "1");
                Assert.assertEquals(((ServiceUnitStateData) receive.getValue()).dstBroker(), (i3 + 9));
                subscribe.acknowledge(receive);
            } finally {
            }
        }
        Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
        if (subscribe != null) {
            subscribe.close();
        }
        create.newMessage().key("1").value((Object) null).send();
        strategicTwoPhaseCompactor.compact(str, this.strategy).get();
        subscribe = this.pulsarClient.newConsumer(this.schema).topic(new String[]{str}).subscriptionName("sub2").readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        try {
            Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
            if (subscribe != null) {
                subscribe.close();
            }
            for (int i4 = 0; i4 < 10; i4++) {
                arrayList.add(create.newMessage().key("1").value(testValue((i4 + 20))).sendAsync());
            }
            FutureUtil.waitForAll(arrayList).get();
            subscribe = this.pulsarClient.newConsumer(this.schema).topic(new String[]{str}).subscriptionName("sub3").readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            for (int i5 = 0; i5 < 10; i5++) {
                try {
                    Message receive2 = subscribe.receive();
                    Assert.assertNotNull(receive2);
                    Assert.assertEquals(receive2.getKey(), "1");
                    Assert.assertEquals(((ServiceUnitStateData) receive2.getValue()).dstBroker(), (i5 + 20));
                    subscribe.acknowledge(receive2);
                } finally {
                }
            }
            Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
            if (subscribe != null) {
                subscribe.close();
            }
        } finally {
            if (subscribe != null) {
                try {
                    subscribe.close();
                } catch (Throwable th) {
                    th.addSuppressed(th);
                }
            }
        }
    }

    public static long versionId(ServiceUnitStateData serviceUnitStateData) {
        if (serviceUnitStateData == null) {
            return 0L;
        }
        return serviceUnitStateData.versionId();
    }
}
