package org.apache.kylin.common.persistence.transaction;

import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.UnitMessages;
import org.apache.kylin.common.persistence.event.Event;
import org.apache.kylin.common.persistence.event.ResourceCreateOrUpdateEvent;
import org.apache.kylin.common.persistence.event.ResourceDeleteEvent;
import org.apache.kylin.common.util.TestUtils;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.io.ByteSource;
import org.apache.kylin.junit.annotation.MetadataInfo;
import org.apache.kylin.junit.annotation.OverwriteProp;
import org.junit.Assert;
import org.junit.jupiter.api.Test;

@MetadataInfo(onlyProps = true)
/* loaded from: input_file:org/apache/kylin/common/persistence/transaction/MessageSynchronizationTest.class */
public class MessageSynchronizationTest {
    private final Charset charset = Charset.defaultCharset();

    @Test
    public void replayTest() {
        MessageSynchronization.getInstance(TestUtils.getTestConfig()).replayInTransaction(new UnitMessages(createEvents()));
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        Assert.assertEquals(1L, kylinMetaStore.getResource("/default/abc.json").getMvcc());
        Assert.assertNull(kylinMetaStore.getResource("/default/abc3.json"));
    }

    @OverwriteProp(key = "kylin.server.mode", value = "query")
    @Test
    public void testKE19979() throws InterruptedException {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        ResourceCreateOrUpdateEvent resourceCreateOrUpdateEvent = new ResourceCreateOrUpdateEvent(new RawResource("/default/abc.json", ByteSource.wrap("version1".getBytes(this.charset)), 0L, atomicInteger.get()));
        final MessageSynchronization messageSynchronization = MessageSynchronization.getInstance(TestUtils.getTestConfig());
        messageSynchronization.replayInTransaction(new UnitMessages(Lists.newArrayList(resourceCreateOrUpdateEvent)));
        final ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1000);
        final CountDownLatch countDownLatch3 = new CountDownLatch(1000);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        Thread thread = new Thread(new Runnable() { // from class: org.apache.kylin.common.persistence.transaction.MessageSynchronizationTest.1
            @Override // java.lang.Runnable
            public void run() {
                countDownLatch.await();
                while (countDownLatch2.getCount() > 0) {
                    messageSynchronization.replayInTransaction(new UnitMessages(Lists.newArrayList(new ResourceCreateOrUpdateEvent(new RawResource("/default/abc.json", ByteSource.wrap("version2".getBytes(MessageSynchronizationTest.this.charset)), 0L, atomicInteger.incrementAndGet())))));
                    countDownLatch2.countDown();
                }
            }
        });
        Thread thread2 = new Thread(new Runnable() { // from class: org.apache.kylin.common.persistence.transaction.MessageSynchronizationTest.2
            @Override // java.lang.Runnable
            public void run() {
                countDownLatch.await();
                while (countDownLatch3.getCount() > 0) {
                    if (null == kylinMetaStore.getResource("/default/abc.json")) {
                        atomicInteger2.incrementAndGet();
                    }
                    countDownLatch3.countDown();
                }
            }
        });
        thread.start();
        thread2.start();
        countDownLatch.countDown();
        countDownLatch2.await();
        countDownLatch3.await();
        Assert.assertEquals(0L, atomicInteger2.get());
    }

    private List<Event> createEvents() {
        return (List) Lists.newArrayList(new ResourceCreateOrUpdateEvent(new RawResource("/default/abc.json", ByteSource.wrap("version1".getBytes(this.charset)), 0L, 0L)), new ResourceCreateOrUpdateEvent(new RawResource("/default/abc2.json", ByteSource.wrap("abc2".getBytes(this.charset)), 0L, 0L)), new ResourceCreateOrUpdateEvent(new RawResource("/default/abc.json", ByteSource.wrap("version2".getBytes(this.charset)), 0L, 1L)), new ResourceCreateOrUpdateEvent(new RawResource("/default/abc3.json", ByteSource.wrap("42".getBytes(this.charset)), 0L, 0L)), new ResourceDeleteEvent("/default/abc3.json")).stream().peek(resourceRelatedEvent -> {
            resourceRelatedEvent.setKey("default");
        }).collect(Collectors.toList());
    }
}
