package org.apache.kylin.common.persistence;

import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import lombok.Generated;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.metadata.JdbcAuditLogStore;
import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil;
import org.apache.kylin.common.persistence.transaction.AbstractAuditLogReplayWorker;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.scheduler.EventBusFactory;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.guava30.shaded.common.base.Joiner;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.guava30.shaded.common.eventbus.Subscribe;
import org.apache.kylin.guava30.shaded.common.io.ByteSource;
import org.apache.kylin.job.shaded.org.apache.commons.dbcp2.BasicDataSourceFactory;
import org.apache.kylin.junit.annotation.MetadataInfo;
import org.apache.kylin.junit.annotation.OverwriteProp;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

@MetadataInfo(onlyProps = true)
/* loaded from: input_file:org/apache/kylin/common/persistence/JdbcAuditLogRecoveryTest.class */
public class JdbcAuditLogRecoveryTest {
    static final String UPDATE_SQL = "update %s set META_TABLE_CONTENT=?, META_TABLE_MVCC=?, META_TABLE_TS=? where META_TABLE_KEY=?";
    private final Charset charset = Charset.defaultCharset();

    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcAuditLogRecoveryTest.class);
    static final String META_TABLE_KEY = "META_TABLE_KEY";
    static final String META_TABLE_CONTENT = "META_TABLE_CONTENT";
    static final String META_TABLE_TS = "META_TABLE_TS";
    static final String META_TABLE_MVCC = "META_TABLE_MVCC";
    static final String INSERT_SQL = "insert into %s (" + Joiner.on(",").join(META_TABLE_KEY, META_TABLE_CONTENT, META_TABLE_TS, META_TABLE_MVCC) + ") values (?, ?, ?, ?)";
    static final String AUDIT_LOG_TABLE_KEY = "meta_key";
    static final String AUDIT_LOG_TABLE_CONTENT = "meta_content";
    static final String AUDIT_LOG_TABLE_TS = "meta_ts";
    static final String AUDIT_LOG_TABLE_MVCC = "meta_mvcc";
    static final String AUDIT_LOG_TABLE_UNIT = "unit_id";
    static final String AUDIT_LOG_TABLE_OPERATOR = "operator";
    static final String AUDIT_LOG_TABLE_INSTANCE = "instance";
    static final String INSERT_AUDIT_LOG_SQL = "insert into %s (" + Joiner.on(",").join(AUDIT_LOG_TABLE_KEY, AUDIT_LOG_TABLE_CONTENT, AUDIT_LOG_TABLE_TS, AUDIT_LOG_TABLE_MVCC, AUDIT_LOG_TABLE_UNIT, AUDIT_LOG_TABLE_OPERATOR, AUDIT_LOG_TABLE_INSTANCE) + ") values (?, ?, ?, ?, ?, ?, ?)";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kylin/common/persistence/JdbcAuditLogRecoveryTest$StatusListener.class */
    public static class StatusListener {
        int status = 0;

        StatusListener() {
        }

        @Subscribe
        public void onStart(AbstractAuditLogReplayWorker.StartReloadEvent startReloadEvent) {
            this.status = 1;
        }

        @Subscribe
        public void onEnd(AbstractAuditLogReplayWorker.EndReloadEvent endReloadEvent) {
            this.status = -1;
        }
    }

    @AfterEach
    public void destroy() throws Exception {
        getJdbcTemplate().batchUpdate("DROP ALL OBJECTS");
    }

    @OverwriteProp(key = "kylin.metadata.url", value = "test@jdbc,driverClassName=org.h2.Driver,url=jdbc:h2:mem:db_default;DB_CLOSE_DELAY=-1,username=sa,password=")
    @Test
    public void testAuditLogOutOfOrder() throws Exception {
        StatusListener statusListener = new StatusListener();
        String identifier = getTestConfig().getMetadataUrl().getIdentifier();
        EventBusFactory.getInstance().register(statusListener, true);
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(getTestConfig());
        JdbcTemplate jdbcTemplate = getJdbcTemplate();
        JdbcAuditLogStore jdbcAuditLogStore = (JdbcAuditLogStore) kylinMetaStore.getAuditLogStore();
        DataSourceTransactionManager transactionManager = jdbcAuditLogStore.getTransactionManager();
        UnitOfWork.doInTransactionWithRetry(() -> {
            ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv()).checkAndPutResource("/_global/project/p1.json", (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
            return null;
        }, "p1");
        UnitOfWork.doInTransactionWithRetry(() -> {
            ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv()).checkAndPutResource("/_global/project/p2.json", (String) new StringEntity(RandomUtil.randomUUIDStr()), (Serializer<String>) StringEntity.serializer);
            return null;
        }, "p2");
        Assert.assertEquals(2L, kylinMetaStore.listResourcesRecursively("/").size());
        new Thread(() -> {
            DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
            defaultTransactionDefinition.setIsolationLevel(2);
            TransactionStatus transaction = transactionManager.getTransaction(defaultTransactionDefinition);
            Thread thread = new Thread(() -> {
                UnitOfWork.doInTransactionWithRetry(() -> {
                    Thread.sleep(500L);
                    ResourceStore kylinMetaStore2 = ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv());
                    String str = "/p1/abc-" + System.currentTimeMillis();
                    RawResource resource = kylinMetaStore2.getResource(str);
                    kylinMetaStore2.checkAndPutResource(str, ByteSource.wrap("abc".getBytes(this.charset)), System.currentTimeMillis(), resource == null ? -1L : resource.getMvcc());
                    return 0;
                }, "p1");
                try {
                    jdbcAuditLogStore.catchupWithTimeout();
                } catch (Exception e) {
                    log.debug("catchup 1st phase failed", e);
                }
            });
            thread.start();
            String randomUUIDStr = RandomUtil.randomUUIDStr();
            HashMap newHashMap = Maps.newHashMap();
            int i = 200;
            IntStream.range(1000, 1000 + 200).forEach(i2 -> {
                String str = "/p2/abc" + i2;
                long currentTimeMillis = System.currentTimeMillis();
                RawResource resource = kylinMetaStore.getResource(str);
                long mvcc = resource == null ? 0L : resource.getMvcc() + 1;
                if (mvcc == 0) {
                    jdbcTemplate.update(String.format(Locale.ROOT, INSERT_SQL, identifier), preparedStatement -> {
                        preparedStatement.setString(1, str);
                        preparedStatement.setBytes(2, str.getBytes(this.charset));
                        preparedStatement.setLong(3, currentTimeMillis);
                        preparedStatement.setLong(4, mvcc);
                    });
                } else {
                    jdbcTemplate.update(String.format(Locale.ROOT, UPDATE_SQL, identifier), preparedStatement2 -> {
                        preparedStatement2.setBytes(1, str.getBytes(this.charset));
                        preparedStatement2.setLong(2, mvcc);
                        preparedStatement2.setLong(3, currentTimeMillis);
                        preparedStatement2.setString(4, str);
                    });
                }
                jdbcTemplate.update(String.format(Locale.ROOT, INSERT_AUDIT_LOG_SQL, identifier + "_audit_log"), preparedStatement3 -> {
                    preparedStatement3.setString(1, str);
                    preparedStatement3.setBytes(2, str.getBytes(this.charset));
                    preparedStatement3.setLong(3, currentTimeMillis);
                    preparedStatement3.setLong(4, mvcc);
                    preparedStatement3.setString(5, randomUUIDStr);
                    preparedStatement3.setString(6, null);
                    preparedStatement3.setString(7, "127.0.0.1:7072");
                });
                newHashMap.put(str, Long.valueOf(mvcc));
            });
            try {
                thread.join();
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                log.debug("wait for thread join failed", e);
            }
            transactionManager.commit(transaction);
            try {
                jdbcAuditLogStore.catchupWithTimeout();
            } catch (Exception e2) {
                log.debug("catchup 2nd phase failed", e2);
            }
            UnitOfWork.doInTransactionWithRetry(() -> {
                ResourceStore kylinMetaStore2 = ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv());
                IntStream.range(1000, 1000 + i).forEach(i3 -> {
                    String str = "/p2/abc" + i3;
                    RawResource resource = kylinMetaStore2.getResource(str);
                    kylinMetaStore2.checkAndPutResource(str, ByteSource.wrap((str + "-version2").getBytes(this.charset)), System.currentTimeMillis(), resource == null ? -1L : resource.getMvcc());
                });
                return 0;
            }, "p2");
            try {
                jdbcAuditLogStore.catchupWithTimeout();
            } catch (Exception e3) {
                log.debug("catchup 3rd phase failed", e3);
            }
        }).start();
        Awaitility.await().atMost(20L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(statusListener.status == -1);
        });
        Assert.assertEquals(203L, kylinMetaStore.listResourcesRecursively("/").size());
    }

    JdbcTemplate getJdbcTemplate() throws Exception {
        return new JdbcTemplate(BasicDataSourceFactory.createDataSource(JdbcUtil.datasourceParameters(getTestConfig().getMetadataUrl())));
    }

    KylinConfig getTestConfig() {
        return KylinConfig.getInstanceFromEnv();
    }
}
