package org.apache.kylin.common.persistence.metadata;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.persistence.AuditLog;
import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.persistence.StringEntity;
import org.apache.kylin.common.persistence.UnitMessages;
import org.apache.kylin.common.persistence.event.ResourceCreateOrUpdateEvent;
import org.apache.kylin.common.persistence.metadata.jdbc.AuditLogRowMapper;
import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil;
import org.apache.kylin.common.persistence.transaction.AuditLogReplayWorker;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.common.util.TestUtils;
import org.apache.kylin.guava30.shaded.common.io.ByteSource;
import org.apache.kylin.job.shaded.org.apache.commons.dbcp2.BasicDataSourceFactory;
import org.apache.kylin.junit.JdbcInfo;
import org.apache.kylin.junit.annotation.JdbcMetadataInfo;
import org.apache.kylin.junit.annotation.MetadataInfo;
import org.apache.kylin.junit.annotation.OverwriteProp;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.shaded.influxdb.org.influxdb.impl.InfluxDBService;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.security.authentication.TestingAuthenticationToken;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.backoff.ExponentialBackOff;

@MetadataInfo(onlyProps = true)
@JdbcMetadataInfo
/* loaded from: input_file:org/apache/kylin/common/persistence/metadata/JdbcAuditLogStoreTest.class */
class JdbcAuditLogStoreTest {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcAuditLogStoreTest.class);
    private static final String LOCAL_INSTANCE = "127.0.0.1";
    private final Charset charset = Charset.defaultCharset();

    JdbcAuditLogStoreTest() {
    }

    @Test
    void testUpdateResourceWithLog(JdbcInfo jdbcInfo) throws Exception {
        UnitOfWork.doInTransactionWithRetry(() -> {
            ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv());
            kylinMetaStore.checkAndPutResource("/p1/abc", ByteSource.wrap("abc".getBytes(this.charset)), -1L);
            kylinMetaStore.checkAndPutResource("/p1/abc2", ByteSource.wrap("abc".getBytes(this.charset)), -1L);
            kylinMetaStore.checkAndPutResource("/p1/abc3", ByteSource.wrap("abc".getBytes(this.charset)), -1L);
            kylinMetaStore.checkAndPutResource("/p1/abc3", ByteSource.wrap("abc2".getBytes(this.charset)), 0L);
            kylinMetaStore.deleteResource("/p1/abc");
            return 0;
        }, "p1");
        StorageURL metadataUrl = TestUtils.getTestConfig().getMetadataUrl();
        JdbcTemplate jdbcTemplate = jdbcInfo.getJdbcTemplate();
        List query = jdbcTemplate.query("select * from " + metadataUrl.getIdentifier() + "_audit_log", new AuditLogRowMapper());
        Assert.assertEquals(5L, query.size());
        Assert.assertEquals("/p1/abc", ((AuditLog) query.get(0)).getResPath());
        Assert.assertEquals("/p1/abc2", ((AuditLog) query.get(1)).getResPath());
        Assert.assertEquals("/p1/abc3", ((AuditLog) query.get(2)).getResPath());
        Assert.assertEquals("/p1/abc3", ((AuditLog) query.get(3)).getResPath());
        Assert.assertEquals("/p1/abc", ((AuditLog) query.get(4)).getResPath());
        Assert.assertEquals(0L, ((AuditLog) query.get(0)).getMvcc());
        Assert.assertEquals(0L, ((AuditLog) query.get(1)).getMvcc());
        Assert.assertEquals(0L, ((AuditLog) query.get(2)).getMvcc());
        Assert.assertEquals(1L, ((AuditLog) query.get(3)).getMvcc());
        Assert.assertNull(((AuditLog) query.get(4)).getMvcc());
        Assert.assertEquals(1L, query.stream().map((v0) -> {
            return v0.getUnitId();
        }).distinct().count());
        SecurityContextHolder.getContext().setAuthentication(new TestingAuthenticationToken("USER1", "ADMIN"));
        UnitOfWork.doInTransactionWithRetry(() -> {
            ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv());
            kylinMetaStore.deleteResource("/p1/abc2");
            kylinMetaStore.deleteResource("/p1/abc3");
            return 0;
        }, "p1");
        List query2 = jdbcTemplate.query("select * from " + metadataUrl.getIdentifier() + "_audit_log", new AuditLogRowMapper());
        Assert.assertEquals(7L, query2.size());
        Assert.assertNull(((AuditLog) query2.get(5)).getMvcc());
        Assert.assertNull(((AuditLog) query2.get(6)).getMvcc());
        Assert.assertEquals("USER1", ((AuditLog) query2.get(5)).getOperator());
        Assert.assertEquals("USER1", ((AuditLog) query2.get(6)).getOperator());
        Assert.assertEquals(2L, query2.stream().map((v0) -> {
            return v0.getUnitId();
        }).distinct().count());
        ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv()).getAuditLogStore().close();
    }

    @Test
    void testRestore(JdbcInfo jdbcInfo) throws Exception {
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        kylinMetaStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
        Assert.assertEquals(1L, kylinMetaStore.listResourcesRecursively("/").size());
        StorageURL metadataUrl = TestUtils.getTestConfig().getMetadataUrl();
        JdbcTemplate jdbcTemplate = jdbcInfo.getJdbcTemplate();
        String randomUUIDStr = RandomUtil.randomUUIDStr();
        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, JdbcAuditLogStore.INSERT_SQL, metadataUrl.getIdentifier() + "_audit_log"), Arrays.asList(new Object[]{"/_global/p1/abc", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/p1/abc2", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/p1/abc3", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/p1/abc3", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 1, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/p1/abc", null, null, null, randomUUIDStr, null, LOCAL_INSTANCE}));
        kylinMetaStore.catchup();
        Assert.assertEquals(3L, kylinMetaStore.listResourcesRecursively("/").size());
        for (int i = 0; i < 1000; i++) {
            String str = InfluxDBService.P + (i + 1000);
            jdbcTemplate.batchUpdate(String.format(Locale.ROOT, JdbcAuditLogStore.INSERT_SQL, metadataUrl.getIdentifier() + "_audit_log"), Arrays.asList(new Object[]{"/_global/" + str + "/abc", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/" + str + "/abc2", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/" + str + "/abc3", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/" + str + "/abc3", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 1, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/" + str + "/abc", null, null, null, randomUUIDStr, null}));
        }
        kylinMetaStore.getAuditLogStore().catchupWithTimeout();
        Awaitility.await().atMost(6L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(2003 == kylinMetaStore.listResourcesRecursively("/").size());
        });
        kylinMetaStore.getAuditLogStore().catchupWithTimeout();
        Assert.assertEquals(2003L, kylinMetaStore.listResourcesRecursively("/").size());
        ((JdbcAuditLogStore) kylinMetaStore.getAuditLogStore()).forceClose();
        kylinMetaStore.getAuditLogStore().close();
    }

    @Test
    void testHandleVersionConflict(JdbcInfo jdbcInfo) throws Exception {
        TestUtils.getTestConfig().setProperty("kylin.auditlog.replay-groupby-project-reload-enable", "false");
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        kylinMetaStore.getMetadataStore().putResource(new RawResource("/_global/p1/abc3", ByteSource.wrap("abc".getBytes(this.charset)), System.currentTimeMillis(), 2L), "sdfasf", -1L);
        kylinMetaStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
        Assert.assertEquals(1L, kylinMetaStore.listResourcesRecursively("/").size());
        StorageURL metadataUrl = TestUtils.getTestConfig().getMetadataUrl();
        JdbcTemplate jdbcTemplate = jdbcInfo.getJdbcTemplate();
        String randomUUIDStr = RandomUtil.randomUUIDStr();
        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, JdbcAuditLogStore.INSERT_SQL, metadataUrl.getIdentifier() + "_audit_log"), Arrays.asList(new Object[]{"/_global/p1/abc3", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/p1/abc3", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 2, randomUUIDStr, null, LOCAL_INSTANCE}));
        kylinMetaStore.catchup();
        Assert.assertEquals(2L, kylinMetaStore.listResourcesRecursively("/").size());
        ((JdbcAuditLogStore) kylinMetaStore.getAuditLogStore()).forceClose();
    }

    @Test
    public void testWaitLogAllCommit(JdbcInfo jdbcInfo) throws Exception {
        TestUtils.getTestConfig().setProperty("kylin.auditlog.replay-groupby-project-reload-enable", "false");
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        kylinMetaStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
        Assert.assertEquals(1L, kylinMetaStore.listResourcesRecursively("/").size());
        StorageURL metadataUrl = TestUtils.getTestConfig().getMetadataUrl();
        JdbcTemplate jdbcTemplate = jdbcInfo.getJdbcTemplate();
        String randomUUIDStr = RandomUtil.randomUUIDStr();
        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, "insert into %s (id, meta_key,meta_content,meta_ts,meta_mvcc,unit_id,operator,instance) values (?, ?, ?, ?, ?, ?, ?, ?)", metadataUrl.getIdentifier() + "_audit_log"), Arrays.asList(new Object[]{22, "/_global/p1/abc", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{4, "/_global/p1/abc2", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}));
        kylinMetaStore.getAuditLogStore().restore(3L);
        Assert.assertEquals(3L, kylinMetaStore.listResourcesRecursively("/").size());
        ((JdbcAuditLogStore) kylinMetaStore.getAuditLogStore()).forceClose();
    }

    @Test
    void testWaitLogAllCommit_DelayQueue(JdbcInfo jdbcInfo) throws Exception {
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        kylinMetaStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
        Assertions.assertEquals(1, kylinMetaStore.listResourcesRecursively("/").size());
        StorageURL metadataUrl = TestUtils.getTestConfig().getMetadataUrl();
        JdbcTemplate jdbcTemplate = jdbcInfo.getJdbcTemplate();
        String randomUUIDStr = RandomUtil.randomUUIDStr();
        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, "insert into %s (id, meta_key,meta_content,meta_ts,meta_mvcc,unit_id,operator,instance) values (?, ?, ?, ?, ?, ?, ?, ?)", metadataUrl.getIdentifier() + "_audit_log"), Arrays.asList(new Object[]{900, "/_global/p1/abc", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{4, "/_global/p1/abc2", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}));
        kylinMetaStore.getAuditLogStore().restore(3L);
        Assertions.assertEquals(3, kylinMetaStore.listResourcesRecursively("/").size());
        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, "insert into %s (id, meta_key,meta_content,meta_ts,meta_mvcc,unit_id,operator,instance) values (?, ?, ?, ?, ?, ?, ?, ?)", metadataUrl.getIdentifier() + "_audit_log"), Arrays.asList(new Object[]{800, "/_global/p1/abc3", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{801, "/_global/p1/abc4", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}));
        kylinMetaStore.getAuditLogStore().catchup();
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(5 == kylinMetaStore.listResourcesRecursively("/").size());
        });
        ((JdbcAuditLogStore) kylinMetaStore.getAuditLogStore()).forceClose();
    }

    @Test
    void testFetchById(JdbcInfo jdbcInfo) throws IOException {
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        kylinMetaStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
        Assertions.assertEquals(1, kylinMetaStore.listResourcesRecursively("/").size());
        StorageURL metadataUrl = TestUtils.getTestConfig().getMetadataUrl();
        JdbcTemplate jdbcTemplate = jdbcInfo.getJdbcTemplate();
        String randomUUIDStr = RandomUtil.randomUUIDStr();
        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, "insert into %s (id, meta_key,meta_content,meta_ts,meta_mvcc,unit_id,operator,instance) values (?, ?, ?, ?, ?, ?, ?, ?)", metadataUrl.getIdentifier() + "_audit_log"), Arrays.asList(new Object[]{1, "/_global/p1/abc", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{2, "/_global/p1/abc2", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{3, "/_global/p1/abc3", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{4, "/_global/p1/abc4", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}));
        List<Long> asList = Arrays.asList(2L, 3L);
        List<AuditLog> fetch = ((JdbcAuditLogStore) kylinMetaStore.getAuditLogStore()).fetch(asList);
        Assertions.assertEquals(2, fetch.size());
        Assertions.assertEquals(asList, fetch.stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        kylinMetaStore.getAuditLogStore().close();
    }

    @Test
    void testGetMinMaxId(JdbcInfo jdbcInfo) throws IOException {
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        kylinMetaStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
        Assertions.assertEquals(1, kylinMetaStore.listResourcesRecursively("/").size());
        StorageURL metadataUrl = TestUtils.getTestConfig().getMetadataUrl();
        JdbcTemplate jdbcTemplate = jdbcInfo.getJdbcTemplate();
        String randomUUIDStr = RandomUtil.randomUUIDStr();
        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, "insert into %s (id, meta_key,meta_content,meta_ts,meta_mvcc,unit_id,operator,instance) values (?, ?, ?, ?, ?, ?, ?, ?)", metadataUrl.getIdentifier() + "_audit_log"), Arrays.asList(new Object[]{11, "/_global/p1/abc", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{22, "/_global/p1/abc2", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{33, "/_global/p1/abc3", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{101, "/_global/p1/abc4", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}));
        AuditLogStore auditLogStore = kylinMetaStore.getAuditLogStore();
        Assertions.assertEquals(101L, auditLogStore.getMaxId());
        Assertions.assertEquals(11L, auditLogStore.getMinId());
        auditLogStore.close();
    }

    @Test
    void testSaveWait_WithoutSleep(JdbcInfo jdbcInfo) throws IOException {
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        kylinMetaStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
        Assertions.assertEquals(1, kylinMetaStore.listResourcesRecursively("/").size());
        UnitMessages unitMessages = new UnitMessages(Collections.singletonList(new ResourceCreateOrUpdateEvent(new RawResource("/p1/test", ByteSource.wrap("test content".getBytes(StandardCharsets.UTF_8)), System.currentTimeMillis(), 0L))));
        AuditLogStore auditLogStore = kylinMetaStore.getAuditLogStore();
        auditLogStore.save(unitMessages);
        Assertions.assertEquals(1, auditLogStore.fetch(0L, 2L).size());
        auditLogStore.close();
    }

    @Test
    void testSaveWait_WithSleep(JdbcInfo jdbcInfo) throws IOException {
        TestUtils.getTestConfig().setProperty("kylin.env.unitofwork-simulation-enabled", "true");
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        kylinMetaStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
        Assertions.assertEquals(1, kylinMetaStore.listResourcesRecursively("/").size());
        UnitMessages unitMessages = new UnitMessages(Collections.singletonList(new ResourceCreateOrUpdateEvent(new RawResource("/p1/test", ByteSource.wrap("test content".getBytes(StandardCharsets.UTF_8)), System.currentTimeMillis(), 0L))));
        AuditLogStore auditLogStore = kylinMetaStore.getAuditLogStore();
        UnitOfWork.doInTransactionWithRetry(UnitOfWorkParams.builder().unitName("_global").sleepMills(100L).maxRetry(1).processor(() -> {
            auditLogStore.save(unitMessages);
            return 0;
        }).build());
        Assertions.assertEquals(1, auditLogStore.fetch(0L, 2L).size());
        auditLogStore.close();
    }

    @Test
    void testSaveWait_WithSleepNotInTrans(JdbcInfo jdbcInfo) throws IOException {
        TestUtils.getTestConfig().setProperty("kylin.env.unitofwork-simulation-enabled", "true");
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        kylinMetaStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
        Assertions.assertEquals(1, kylinMetaStore.listResourcesRecursively("/").size());
        UnitMessages unitMessages = new UnitMessages(Collections.singletonList(new ResourceCreateOrUpdateEvent(new RawResource("/p1/test", ByteSource.wrap("test content".getBytes(StandardCharsets.UTF_8)), System.currentTimeMillis(), 0L))));
        AuditLogStore auditLogStore = kylinMetaStore.getAuditLogStore();
        auditLogStore.save(unitMessages);
        Assertions.assertEquals(1, auditLogStore.fetch(0L, 2L).size());
        auditLogStore.close();
    }

    @Test
    void testRestoreWithoutOrder(JdbcInfo jdbcInfo) throws Exception {
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        kylinMetaStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
        Assert.assertEquals(1L, kylinMetaStore.listResourcesRecursively("/").size());
        StorageURL metadataUrl = TestUtils.getTestConfig().getMetadataUrl();
        JdbcTemplate jdbcTemplate = jdbcInfo.getJdbcTemplate();
        String randomUUIDStr = RandomUtil.randomUUIDStr();
        String randomUUIDStr2 = RandomUtil.randomUUIDStr();
        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, JdbcAuditLogStore.INSERT_SQL, metadataUrl.getIdentifier() + "_audit_log"), Arrays.asList(new Object[]{"/_global/p1/abc", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/p1/abc2", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr2, null, LOCAL_INSTANCE}, new Object[]{"/_global/p1/abc3", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/p1/abc3", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 1, randomUUIDStr2, null, LOCAL_INSTANCE}, new Object[]{"/_global/p1/abc", null, null, null, randomUUIDStr, null, LOCAL_INSTANCE}));
        kylinMetaStore.catchup();
        Assert.assertEquals(3L, kylinMetaStore.listResourcesRecursively("/").size());
        ((JdbcAuditLogStore) kylinMetaStore.getAuditLogStore()).forceClose();
    }

    @Test
    void testRestore_WhenOtherAppend(JdbcInfo jdbcInfo) throws Exception {
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        kylinMetaStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
        Assert.assertEquals(1L, kylinMetaStore.listResourcesRecursively("/").size());
        TestUtils.getTestConfig().getMetadataUrl();
        JdbcTemplate jdbcTemplate = jdbcInfo.getJdbcTemplate();
        String str = jdbcInfo.getTableName() + "_audit_log";
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        new Thread(() -> {
            int i = 0;
            while (!atomicBoolean.get()) {
                String randomUUIDStr = RandomUtil.randomUUIDStr();
                jdbcTemplate.batchUpdate(String.format(Locale.ROOT, JdbcAuditLogStore.INSERT_SQL, str), Arrays.asList(new Object[]{"/_global/p0/abc", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), Integer.valueOf(i), randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/p0/abc2", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), Integer.valueOf(i), randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/p0/abc3", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), Integer.valueOf(i), randomUUIDStr, null, LOCAL_INSTANCE}));
                i++;
            }
        }).start();
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(((Long) jdbcTemplate.queryForObject(new StringBuilder().append("select count(1) from ").append(str).toString(), Long.class)).longValue() > 1000);
        });
        kylinMetaStore.catchup();
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(((Long) jdbcTemplate.queryForObject(new StringBuilder().append("select count(1) from ").append(str).toString(), Long.class)).longValue() > ExponentialBackOff.DEFAULT_INITIAL_INTERVAL);
        });
        Assert.assertEquals(4L, kylinMetaStore.listResourcesRecursively("/").size());
        atomicBoolean.compareAndSet(false, true);
        ((JdbcAuditLogStore) kylinMetaStore.getAuditLogStore()).forceClose();
    }

    @OverwriteProp(key = "kylin.metadata.audit-log.max-size", value = "1000")
    @Test
    void testRotate(JdbcInfo jdbcInfo) throws Exception {
        KylinConfig testConfig = TestUtils.getTestConfig();
        JdbcTemplate jdbcTemplate = jdbcInfo.getJdbcTemplate();
        JdbcAuditLogStore jdbcAuditLogStore = new JdbcAuditLogStore(testConfig, jdbcTemplate, new DataSourceTransactionManager(BasicDataSourceFactory.createDataSource(JdbcUtil.datasourceParameters(testConfig.getMetadataUrl()))), jdbcInfo.getTableName() + "_audit_log");
        jdbcAuditLogStore.createIfNotExist();
        String str = jdbcInfo.getTableName() + "_audit_log";
        for (int i = 0; i < 1000; i++) {
            String str2 = InfluxDBService.P + (i + 1000);
            String randomUUIDStr = RandomUtil.randomUUIDStr();
            jdbcTemplate.batchUpdate(String.format(Locale.ROOT, JdbcAuditLogStore.INSERT_SQL, str), Arrays.asList(new Object[]{"/" + str2 + "/abc", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/" + str2 + "/abc2", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/" + str2 + "/abc3", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/" + str2 + "/abc3", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 1, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/" + str2 + "/abc", null, null, null, randomUUIDStr, null, LOCAL_INSTANCE}));
        }
        jdbcAuditLogStore.rotate();
        Assert.assertEquals(1000L, ((Long) jdbcTemplate.queryForObject("select count(1) from " + str, Long.class)).longValue());
        TestUtils.getTestConfig().setProperty("kylin.metadata.audit-log.max-size", "1500");
        jdbcAuditLogStore.rotate();
        Assert.assertEquals(1000L, ((Long) jdbcTemplate.queryForObject("select count(1) from " + str, Long.class)).longValue());
        jdbcAuditLogStore.close();
    }

    @Test
    public void testGet() throws IOException {
        UnitOfWork.doInTransactionWithRetry(() -> {
            ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv()).checkAndPutResource("/p1/123", ByteSource.wrap("123".getBytes(this.charset)), -1L);
            return 0;
        }, "p1");
        AuditLogStore auditLogStore = ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv()).getAuditLogStore();
        Assert.assertNotNull(auditLogStore.get("/p1/123", 0L));
        Assert.assertNull(auditLogStore.get("/p1/126", 0L));
        Assert.assertNull(auditLogStore.get("/p1/abc", 1L));
        auditLogStore.close();
    }

    @Test
    void testMannualHandleReplay(JdbcInfo jdbcInfo) throws Exception {
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        changeProject("abc", false, jdbcInfo);
        new JdbcAuditLogStore(TestUtils.getTestConfig()).catchupWithTimeout();
        Assert.assertEquals(1L, kylinMetaStore.listResourcesRecursively("/").size());
        ((JdbcAuditLogStore) kylinMetaStore.getAuditLogStore()).forceClose();
    }

    @Test
    void testStopReplay() throws IOException {
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        kylinMetaStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
        Assertions.assertEquals(1, kylinMetaStore.listResourcesRecursively("/").size());
        kylinMetaStore.getAuditLogStore().close();
        Assertions.assertThrows(RejectedExecutionException.class, () -> {
            kylinMetaStore.catchup();
            ((JdbcAuditLogStore) kylinMetaStore.getAuditLogStore()).forceClose();
        });
        kylinMetaStore.getAuditLogStore().close();
    }

    @Test
    void testRestartReplay(JdbcInfo jdbcInfo) throws Exception {
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(TestUtils.getTestConfig());
        kylinMetaStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
        Assert.assertEquals(1L, kylinMetaStore.listResourcesRecursively("/").size());
        StorageURL metadataUrl = TestUtils.getTestConfig().getMetadataUrl();
        JdbcTemplate jdbcTemplate = jdbcInfo.getJdbcTemplate();
        String randomUUIDStr = RandomUtil.randomUUIDStr();
        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, JdbcAuditLogStore.INSERT_SQL, metadataUrl.getIdentifier() + "_audit_log"), Arrays.asList(new Object[]{"/_global/p1/abc", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/p1/abc2", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/p1/abc3", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/p1/abc3", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 1, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/p1/abc", null, null, null, randomUUIDStr, null, LOCAL_INSTANCE}));
        kylinMetaStore.catchup();
        Assert.assertEquals(3L, kylinMetaStore.listResourcesRecursively("/").size());
        kylinMetaStore.getAuditLogStore().pause();
        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, JdbcAuditLogStore.INSERT_SQL, metadataUrl.getIdentifier() + "_audit_log"), Arrays.asList(new Object[]{"/_global/p1/abcd", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/p1/abce", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}));
        kylinMetaStore.getAuditLogStore().reInit();
        Awaitility.await().atMost(6L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(5 == kylinMetaStore.listResourcesRecursively("/").size());
        });
        kylinMetaStore.getAuditLogStore().close();
    }

    @Test
    void testQueryNodeAuditLogCatchup(JdbcInfo jdbcInfo) {
        KylinConfig testConfig = TestUtils.getTestConfig();
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(testConfig);
        kylinMetaStore.checkAndPutResource(ResourceStore.METASTORE_UUID_TAG, (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
        Assertions.assertEquals(1, kylinMetaStore.listResourcesRecursively("/").size());
        StorageURL metadataUrl = TestUtils.getTestConfig().getMetadataUrl();
        JdbcTemplate jdbcTemplate = jdbcInfo.getJdbcTemplate();
        String randomUUIDStr = RandomUtil.randomUUIDStr();
        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, JdbcAuditLogStore.INSERT_SQL, metadataUrl.getIdentifier() + "_audit_log"), Arrays.asList(new Object[]{"/_global/project/abc1", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/project/abc2", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/project/abc3", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}));
        Assertions.assertEquals(0L, kylinMetaStore.getOffset());
        ((JdbcAuditLogStore) kylinMetaStore.getAuditLogStore()).forceClose();
        KylinConfig createKylinConfig = KylinConfig.createKylinConfig(testConfig);
        createKylinConfig.setProperty("kylin.server.mode", "query");
        createKylinConfig.setProperty("kylin.server.store-type", QueryUtil.JDBC);
        ResourceStore kylinMetaStore2 = ResourceStore.getKylinMetaStore(createKylinConfig);
        Assertions.assertEquals(3L, kylinMetaStore2.getOffset());
        AuditLogStore auditLogStore = kylinMetaStore2.getAuditLogStore();
        AuditLogReplayWorker auditLogReplayWorker = (AuditLogReplayWorker) ReflectionTestUtils.getField(auditLogStore, "replayWorker");
        Assertions.assertNotNull(auditLogReplayWorker);
        Assertions.assertEquals(0L, auditLogReplayWorker.getLogOffset());
        AuditLogReplayWorker auditLogReplayWorker2 = (AuditLogReplayWorker) Mockito.spy(auditLogReplayWorker);
        ((AuditLogReplayWorker) Mockito.doNothing().when(auditLogReplayWorker2)).catchupToMaxId(Mockito.anyLong());
        ReflectionTestUtils.setField(auditLogStore, "replayWorker", auditLogReplayWorker2);
        kylinMetaStore2.getMetadataStore().setAuditLogStore(auditLogStore);
        kylinMetaStore2.catchup();
        Assertions.assertEquals(3L, ((JdbcAuditLogStore) kylinMetaStore2.getMetadataStore().getAuditLogStore()).replayWorker.getLogOffset());
        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, JdbcAuditLogStore.INSERT_SQL, metadataUrl.getIdentifier() + "_audit_log"), Arrays.asList(new Object[]{ResourceStore.METASTORE_IMAGE, "{\"offset\":5}".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}, new Object[]{"/_global/project/abc4", "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE}));
        ReflectionTestUtils.setField(auditLogStore, "replayWorker", auditLogReplayWorker);
        kylinMetaStore2.getMetadataStore().setAuditLogStore(auditLogStore);
        kylinMetaStore2.catchup();
        kylinMetaStore2.catchup();
        Assertions.assertEquals(5L, ((JdbcAuditLogStore) kylinMetaStore2.getMetadataStore().getAuditLogStore()).replayWorker.getLogOffset());
        ((JdbcAuditLogStore) kylinMetaStore2.getAuditLogStore()).forceClose();
    }

    void changeProject(String str, boolean z, JdbcInfo jdbcInfo) throws Exception {
        JdbcTemplate jdbcTemplate = jdbcInfo.getJdbcTemplate();
        StorageURL metadataUrl = TestUtils.getTestConfig().getMetadataUrl();
        String randomUUIDStr = RandomUtil.randomUUIDStr();
        Object[] objArr = z ? new Object[]{"/_global/project/" + str + MetadataConstants.FILE_SURFIX, null, Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE} : new Object[]{"/_global/project/" + str + MetadataConstants.FILE_SURFIX, "abc".getBytes(this.charset), Long.valueOf(System.currentTimeMillis()), 0, randomUUIDStr, null, LOCAL_INSTANCE};
        ArrayList arrayList = new ArrayList();
        arrayList.add(objArr);
        jdbcTemplate.batchUpdate(String.format(Locale.ROOT, JdbcAuditLogStore.INSERT_SQL, metadataUrl.getIdentifier() + "_audit_log"), arrayList);
    }
}
