package org.apache.kylin.common.persistence.transaction;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.persistence.AuditLog;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.persistence.StringEntity;
import org.apache.kylin.common.persistence.metadata.AuditLogStore;
import org.apache.kylin.common.persistence.transaction.AbstractAuditLogReplayWorker;
import org.apache.kylin.common.persistence.transaction.AuditLogReplayWorker;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.common.util.TestUtils;
import org.apache.kylin.guava30.shaded.common.io.ByteSource;
import org.apache.kylin.junit.JdbcInfo;
import org.apache.kylin.junit.annotation.JdbcMetadataInfo;
import org.apache.kylin.junit.annotation.MetadataInfo;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.transaction.InvalidTimeoutException;
import org.springframework.transaction.TransactionUsageException;

@MetadataInfo(onlyProps = true)
@JdbcMetadataInfo
/* loaded from: input_file:org/apache/kylin/common/persistence/transaction/AuditReplayWorkerTest.class */
public class AuditReplayWorkerTest {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(AuditReplayWorkerTest.class);
    private static final String LOCAL_INSTANCE = "127.0.0.1";
    private final Charset charset = Charset.defaultCharset();

    @Test
    void testStartSchedule() throws IOException {
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        kylinMetaStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
        AuditLogStore auditLogStore = kylinMetaStore.getAuditLogStore();
        AuditLogReplayWorker auditLogReplayWorker = (AuditLogReplayWorker) ReflectionTestUtils.getField(auditLogStore, "replayWorker");
        Assertions.assertNotNull(auditLogReplayWorker);
        auditLogReplayWorker.startSchedule(2L, true);
        Assertions.assertEquals(2L, auditLogStore.getLogOffset());
        auditLogReplayWorker.startSchedule(3L, false);
        Assertions.assertEquals(3L, auditLogStore.getLogOffset());
        auditLogStore.close();
    }

    @Test
    void testRestoreSmallId(JdbcInfo jdbcInfo) throws IOException {
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        kylinMetaStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
        AuditLogStore auditLogStore = kylinMetaStore.getAuditLogStore();
        JdbcTemplate jdbcTemplate = jdbcInfo.getJdbcTemplate();
        StorageURL metadataUrl = TestUtils.getTestConfig().getMetadataUrl();
        String randomUUIDStr = RandomUtil.randomUUIDStr();
        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, "insert into %s (id, meta_key,meta_content,meta_ts,meta_mvcc,unit_id,operator,instance) values (?, ?, ?, ?, ?, ?, ?, ?)", metadataUrl.getIdentifier() + "_audit_log"), Arrays.asList(new Object[]{900, "/_global/p1/abc", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{4, "/_global/p1/abc2", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}));
        auditLogStore.restore(3L);
        Assertions.assertEquals(900L, auditLogStore.getLogOffset());
        auditLogStore.close();
    }

    @Test
    void testRestoreUpdateOffset() throws IOException {
        AuditLogStore auditLogStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig()).getAuditLogStore();
        AuditLogReplayWorker auditLogReplayWorker = (AuditLogReplayWorker) ReflectionTestUtils.getField(auditLogStore, "replayWorker");
        Assertions.assertNotNull(auditLogReplayWorker);
        auditLogReplayWorker.updateOffset(101L);
        Assertions.assertEquals(101L, auditLogStore.getLogOffset());
        auditLogReplayWorker.updateOffset(99L);
        Assertions.assertEquals(101L, auditLogStore.getLogOffset());
        auditLogStore.close();
    }

    @Test
    void testRestoreHasCatchUp() throws IOException {
        AuditLogStore auditLogStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig()).getAuditLogStore();
        AuditLogReplayWorker auditLogReplayWorker = (AuditLogReplayWorker) ReflectionTestUtils.getField(auditLogStore, "replayWorker");
        Assertions.assertNotNull(auditLogReplayWorker);
        auditLogReplayWorker.updateOffset(101L);
        Assertions.assertEquals(101L, auditLogStore.getLogOffset());
        Boolean bool = (Boolean) ReflectionTestUtils.invokeMethod(auditLogReplayWorker, "hasCatch", new Object[]{100L});
        Assertions.assertNotNull(bool);
        Assertions.assertTrue(bool.booleanValue());
        Boolean bool2 = (Boolean) ReflectionTestUtils.invokeMethod(auditLogReplayWorker, "hasCatch", new Object[]{102L});
        Assertions.assertNotNull(bool2);
        Assertions.assertFalse(bool2.booleanValue());
        auditLogStore.close();
    }

    @Test
    void testCatchupInternal_Stopped() throws IOException {
        AuditLogStore auditLogStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig()).getAuditLogStore();
        AuditLogReplayWorker auditLogReplayWorker = (AuditLogReplayWorker) ReflectionTestUtils.getField(auditLogStore, "replayWorker");
        Assertions.assertNotNull(auditLogReplayWorker);
        auditLogReplayWorker.updateOffset(101L);
        Assertions.assertEquals(101L, auditLogStore.getLogOffset());
        AtomicBoolean atomicBoolean = (AtomicBoolean) ReflectionTestUtils.getField(auditLogReplayWorker, "isStopped");
        Assertions.assertNotNull(atomicBoolean);
        atomicBoolean.set(true);
        ReflectionTestUtils.invokeMethod(auditLogReplayWorker, "catchupInternal", new Object[]{1});
        Assertions.assertEquals(101L, auditLogStore.getLogOffset());
        auditLogStore.close();
    }

    @Test
    void testCatchupInternal_TransactionUsageException() throws IOException {
        AuditLogStore auditLogStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig()).getAuditLogStore();
        AuditLogReplayWorker auditLogReplayWorker = (AuditLogReplayWorker) ReflectionTestUtils.getField(auditLogStore, "replayWorker");
        Assertions.assertNotNull(auditLogReplayWorker);
        auditLogReplayWorker.updateOffset(101L);
        Assertions.assertEquals(101L, auditLogStore.getLogOffset());
        AtomicBoolean atomicBoolean = (AtomicBoolean) ReflectionTestUtils.getField(auditLogReplayWorker, "isStopped");
        Assertions.assertNotNull(atomicBoolean);
        atomicBoolean.set(false);
        AuditLogReplayWorker auditLogReplayWorker2 = (AuditLogReplayWorker) Mockito.spy(auditLogReplayWorker);
        ((AuditLogReplayWorker) Mockito.doThrow(new Throwable[]{new InvalidTimeoutException("xxxx", 123)}).when(auditLogReplayWorker2)).catchupToMaxId(1L);
        try {
            ReflectionTestUtils.invokeMethod(auditLogReplayWorker2, "catchupToMaxId", new Object[]{1L});
            Assertions.fail();
        } catch (TransactionUsageException e) {
            Assertions.assertEquals("xxxx", e.getMessage());
        }
        auditLogStore.close();
    }

    @Test
    void testCatchupInternal_OtherException() {
        AuditLogReplayWorker auditLogReplayWorker = getAuditLogReplayWorker();
        AuditLogReplayWorker auditLogReplayWorker2 = (AuditLogReplayWorker) Mockito.spy(auditLogReplayWorker);
        ConcurrentLinkedQueue concurrentLinkedQueue = (ConcurrentLinkedQueue) ReflectionTestUtils.getField(auditLogReplayWorker2, "delayIdQueue");
        Assertions.assertNotNull(concurrentLinkedQueue);
        concurrentLinkedQueue.add(new AuditLogReplayWorker.AuditIdTimeItem(1L, 123L));
        Assertions.assertFalse(concurrentLinkedQueue.isEmpty());
        IllegalArgumentException illegalArgumentException = new IllegalArgumentException("xxxx");
        ((AuditLogReplayWorker) Mockito.doThrow(new Throwable[]{illegalArgumentException}).when(auditLogReplayWorker2)).catchupToMaxId(0L);
        ((AuditLogReplayWorker) Mockito.doNothing().when(auditLogReplayWorker2)).handleReloadAll(illegalArgumentException);
        auditLogReplayWorker2.catchupInternal(0);
        Assertions.assertTrue(concurrentLinkedQueue.isEmpty());
        auditLogReplayWorker.close(true);
    }

    @Test
    void testCollectReplayDelayedId_EmptyQueue() {
        AuditLogReplayWorker auditLogReplayWorker = getAuditLogReplayWorker();
        Assertions.assertNotNull(auditLogReplayWorker);
        ReflectionTestUtils.invokeMethod(auditLogReplayWorker, "collectReplayDelayedId", new Object[]{1});
        ConcurrentLinkedQueue concurrentLinkedQueue = (ConcurrentLinkedQueue) ReflectionTestUtils.getField(auditLogReplayWorker, "delayIdQueue");
        Assertions.assertNotNull(concurrentLinkedQueue);
        Assertions.assertTrue(CollectionUtils.isEmpty(concurrentLinkedQueue));
        auditLogReplayWorker.close(true);
    }

    @Test
    void testCollectReplayDelayedId_NotEmptyQueue() {
        AuditLogReplayWorker auditLogReplayWorker = getAuditLogReplayWorker();
        ConcurrentLinkedQueue concurrentLinkedQueue = (ConcurrentLinkedQueue) ReflectionTestUtils.getField(auditLogReplayWorker, "delayIdQueue");
        Assertions.assertNotNull(concurrentLinkedQueue);
        concurrentLinkedQueue.add(new AuditLogReplayWorker.AuditIdTimeItem(1L, System.currentTimeMillis()));
        concurrentLinkedQueue.add(new AuditLogReplayWorker.AuditIdTimeItem(2L, System.currentTimeMillis()));
        Assertions.assertEquals(Arrays.asList(1L, 2L), (List) ReflectionTestUtils.invokeMethod(auditLogReplayWorker, "collectReplayDelayedId", new Object[]{10}));
        auditLogReplayWorker.close(true);
    }

    @Test
    void testCollectReplayDelayedId_MaxCount() {
        AuditLogReplayWorker auditLogReplayWorker = getAuditLogReplayWorker();
        ConcurrentLinkedQueue concurrentLinkedQueue = (ConcurrentLinkedQueue) ReflectionTestUtils.getField(auditLogReplayWorker, "delayIdQueue");
        Assertions.assertNotNull(concurrentLinkedQueue);
        concurrentLinkedQueue.add(new AuditLogReplayWorker.AuditIdTimeItem(1L, System.currentTimeMillis()));
        concurrentLinkedQueue.add(new AuditLogReplayWorker.AuditIdTimeItem(2L, System.currentTimeMillis()));
        concurrentLinkedQueue.add(new AuditLogReplayWorker.AuditIdTimeItem(3L, System.currentTimeMillis()));
        Assertions.assertEquals(Arrays.asList(1L, 2L), (List) ReflectionTestUtils.invokeMethod(auditLogReplayWorker, "collectReplayDelayedId", new Object[]{2}));
        Assertions.assertEquals(3, concurrentLinkedQueue.size());
        auditLogReplayWorker.close(true);
    }

    @Test
    void testCollectReplayDelayedId_Timeout() {
        long eventualReplayDelayItemTimeout = TestUtils.getTestConfig().getEventualReplayDelayItemTimeout();
        AuditLogReplayWorker auditLogReplayWorker = getAuditLogReplayWorker();
        ConcurrentLinkedQueue concurrentLinkedQueue = (ConcurrentLinkedQueue) ReflectionTestUtils.getField(auditLogReplayWorker, "delayIdQueue");
        Assertions.assertNotNull(concurrentLinkedQueue);
        concurrentLinkedQueue.add(new AuditLogReplayWorker.AuditIdTimeItem(1L, System.currentTimeMillis() - (eventualReplayDelayItemTimeout * 2)));
        concurrentLinkedQueue.add(new AuditLogReplayWorker.AuditIdTimeItem(2L, System.currentTimeMillis()));
        concurrentLinkedQueue.add(new AuditLogReplayWorker.AuditIdTimeItem(3L, System.currentTimeMillis()));
        Assertions.assertEquals(Arrays.asList(1L, 2L), (List) ReflectionTestUtils.invokeMethod(auditLogReplayWorker, "collectReplayDelayedId", new Object[]{2}));
        Assertions.assertEquals(2, concurrentLinkedQueue.size());
        auditLogReplayWorker.close(true);
    }

    @Test
    void testWaitMaxIdOk() {
        AuditLogReplayWorker auditLogReplayWorker = getAuditLogReplayWorker();
        Boolean bool = (Boolean) ReflectionTestUtils.invokeMethod(auditLogReplayWorker, "waitMaxIdOk", new Object[]{100L, 100L});
        Assertions.assertNotNull(bool);
        Assertions.assertTrue(bool.booleanValue());
        Boolean bool2 = (Boolean) ReflectionTestUtils.invokeMethod(auditLogReplayWorker, "waitMaxIdOk", new Object[]{101L, 100L});
        Assertions.assertNotNull(bool2);
        Assertions.assertFalse(bool2.booleanValue());
        TestUtils.getTestConfig().setProperty("kylin.auditlog.replay-need-consecutive-log", "false");
        Boolean bool3 = (Boolean) ReflectionTestUtils.invokeMethod(auditLogReplayWorker, "waitMaxIdOk", new Object[]{101L, 100L});
        Assertions.assertNotNull(bool3);
        Assertions.assertTrue(bool3.booleanValue());
        auditLogReplayWorker.close(true);
    }

    @Test
    void testRecordStepAbsentIdList_EmptyList() {
        AuditLogReplayWorker auditLogReplayWorker = getAuditLogReplayWorker();
        AbstractAuditLogReplayWorker.FixedWindow fixedWindow = new AbstractAuditLogReplayWorker.FixedWindow(100L, 150L);
        ConcurrentLinkedQueue concurrentLinkedQueue = (ConcurrentLinkedQueue) ReflectionTestUtils.getField(auditLogReplayWorker, "delayIdQueue");
        Assertions.assertNotNull(concurrentLinkedQueue);
        ReflectionTestUtils.invokeMethod(auditLogReplayWorker, "recordStepAbsentIdList", new Object[]{fixedWindow, Collections.EMPTY_LIST});
        Assertions.assertTrue(concurrentLinkedQueue.isEmpty());
        auditLogReplayWorker.close(true);
    }

    @Test
    void testRecordStepAbsentIdList_SameLength() {
        AuditLogReplayWorker auditLogReplayWorker = getAuditLogReplayWorker();
        AbstractAuditLogReplayWorker.FixedWindow fixedWindow = new AbstractAuditLogReplayWorker.FixedWindow(100L, 101L);
        ConcurrentLinkedQueue concurrentLinkedQueue = (ConcurrentLinkedQueue) ReflectionTestUtils.getField(auditLogReplayWorker, "delayIdQueue");
        Assertions.assertNotNull(concurrentLinkedQueue);
        ReflectionTestUtils.invokeMethod(auditLogReplayWorker, "recordStepAbsentIdList", new Object[]{fixedWindow, Collections.singletonList(new AuditLog(101L, "adaasd", ByteSource.wrap("test content".getBytes(StandardCharsets.UTF_8)), 1L, 1L, null, null, null))});
        Assertions.assertTrue(concurrentLinkedQueue.isEmpty());
        auditLogReplayWorker.close(true);
    }

    @Test
    void testRecordStepAbsentIdList_SkipTooOldAudit() {
        AuditLogReplayWorker auditLogReplayWorker = getAuditLogReplayWorker();
        AbstractAuditLogReplayWorker.FixedWindow fixedWindow = new AbstractAuditLogReplayWorker.FixedWindow(100L, 104L);
        ConcurrentLinkedQueue concurrentLinkedQueue = (ConcurrentLinkedQueue) ReflectionTestUtils.getField(auditLogReplayWorker, "delayIdQueue");
        Assertions.assertNotNull(concurrentLinkedQueue);
        Long l = (Long) ReflectionTestUtils.getField(auditLogReplayWorker, "idEarliestTimeoutMills");
        Assertions.assertNotNull(l);
        ReflectionTestUtils.invokeMethod(auditLogReplayWorker, "recordStepAbsentIdList", new Object[]{fixedWindow, Arrays.asList(new AuditLog(101L, "adaasd", ByteSource.wrap("test content".getBytes(StandardCharsets.UTF_8)), Long.valueOf(System.currentTimeMillis() - (l.longValue() * 2)), 1L, null, null, null), new AuditLog(102L, "adaasd", ByteSource.wrap("test content".getBytes(StandardCharsets.UTF_8)), Long.valueOf(System.currentTimeMillis() - (l.longValue() * 2)), 1L, null, null, null), new AuditLog(103L, "adaasd", ByteSource.wrap("test content".getBytes(StandardCharsets.UTF_8)), Long.valueOf(System.currentTimeMillis() - (l.longValue() * 2)), 1L, null, null, null))});
        Assertions.assertTrue(concurrentLinkedQueue.isEmpty());
        auditLogReplayWorker.close(true);
    }

    @Test
    void testRecordStepAbsentIdList_CollectAbsentId() {
        AuditLogReplayWorker auditLogReplayWorker = getAuditLogReplayWorker();
        AbstractAuditLogReplayWorker.FixedWindow fixedWindow = new AbstractAuditLogReplayWorker.FixedWindow(99L, 104L);
        ConcurrentLinkedQueue concurrentLinkedQueue = (ConcurrentLinkedQueue) ReflectionTestUtils.getField(auditLogReplayWorker, "delayIdQueue");
        Assertions.assertNotNull(concurrentLinkedQueue);
        Assertions.assertNotNull((Long) ReflectionTestUtils.getField(auditLogReplayWorker, "idEarliestTimeoutMills"));
        ReflectionTestUtils.invokeMethod(auditLogReplayWorker, "recordStepAbsentIdList", new Object[]{fixedWindow, Arrays.asList(new AuditLog(101L, "adaasd", ByteSource.wrap("test content".getBytes(StandardCharsets.UTF_8)), Long.valueOf(System.currentTimeMillis()), 1L, null, null, null), new AuditLog(102L, "adaasd", ByteSource.wrap("test content".getBytes(StandardCharsets.UTF_8)), Long.valueOf(System.currentTimeMillis()), 1L, null, null, null), new AuditLog(103L, "adaasd", ByteSource.wrap("test content".getBytes(StandardCharsets.UTF_8)), Long.valueOf(System.currentTimeMillis()), 1L, null, null, null))});
        Assertions.assertEquals(Arrays.asList(100L, 104L), concurrentLinkedQueue.stream().map((v0) -> {
            return v0.getAuditLogId();
        }).collect(Collectors.toList()));
        auditLogReplayWorker.close(true);
    }

    @Test
    void testFindAbsentId_EmptyList() {
        AuditLogReplayWorker auditLogReplayWorker = getAuditLogReplayWorker();
        List list = (List) ReflectionTestUtils.invokeMethod(auditLogReplayWorker, "findAbsentId", new Object[]{Collections.emptyList(), new AbstractAuditLogReplayWorker.FixedWindow(99L, 104L)});
        Assertions.assertNotNull(list);
        Assertions.assertTrue(list.isEmpty());
        auditLogReplayWorker.close(true);
    }

    @Test
    void testFindAbsentId_CollectId() {
        AuditLogReplayWorker auditLogReplayWorker = getAuditLogReplayWorker();
        List list = (List) ReflectionTestUtils.invokeMethod(auditLogReplayWorker, "findAbsentId", new Object[]{Arrays.asList(101L, 104L), new AbstractAuditLogReplayWorker.FixedWindow(99L, 104L)});
        Assertions.assertNotNull(list);
        Assertions.assertEquals(Arrays.asList(100L, 102L, 103L), list);
        auditLogReplayWorker.close(true);
    }

    private AuditLogReplayWorker getAuditLogReplayWorker() {
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        kylinMetaStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
        AuditLogReplayWorker auditLogReplayWorker = (AuditLogReplayWorker) ReflectionTestUtils.getField(kylinMetaStore.getAuditLogStore(), "replayWorker");
        Assertions.assertNotNull(auditLogReplayWorker);
        return auditLogReplayWorker;
    }
}
