package org.apache.flink.cdc.connectors.starrocks.sink.utils;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.factories.Factory;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkFactory;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.lifecycle.Startables;

/* loaded from: input_file:org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.class */
public class StarRocksSinkTestBase extends TestLogger {
    protected static final int DEFAULT_PARALLELISM = 1;
    public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;

    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(DEFAULT_PARALLELISM).setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());
    protected static final Logger LOG = LoggerFactory.getLogger(StarRocksSinkTestBase.class);
    protected static final StarRocksContainer STARROCKS_CONTAINER = createStarRocksContainer();

    /* loaded from: input_file:org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase$MockContext.class */
    static class MockContext implements Factory.Context {
        Configuration factoryConfiguration;

        public MockContext(Configuration configuration) {
            this.factoryConfiguration = configuration;
        }

        public Configuration getFactoryConfiguration() {
            return this.factoryConfiguration;
        }

        public Configuration getPipelineConfiguration() {
            return Configuration.fromMap(Collections.singletonMap("local-time-zone", "UTC"));
        }

        public ClassLoader getClassLoader() {
            return null;
        }
    }

    private static StarRocksContainer createStarRocksContainer() {
        return new StarRocksContainer();
    }

    @BeforeClass
    public static void startContainers() {
        LOG.info("Starting containers...");
        Startables.deepStart(Stream.of(STARROCKS_CONTAINER)).join();
        LOG.info("Waiting for StarRocks to launch");
        long currentTimeMillis = System.currentTimeMillis();
        new LogMessageWaitStrategy().withRegEx(".*Enjoy the journal to StarRocks blazing-fast lake-house engine!.*\\s").withTimes(DEFAULT_PARALLELISM).withStartupTimeout(Duration.of(240L, ChronoUnit.SECONDS)).waitUntilReady(STARROCKS_CONTAINER);
        while (!checkBackendAvailability()) {
            if (System.currentTimeMillis() - currentTimeMillis > 240000) {
                throw new RuntimeException("StarRocks backend startup timed out.");
                break;
            } else {
                LOG.info("Waiting for backends to be available");
                Thread.sleep(1000L);
            }
        }
        LOG.info("Containers are started.");
    }

    @AfterClass
    public static void stopContainers() {
        LOG.info("Stopping containers...");
        STARROCKS_CONTAINER.stop();
        LOG.info("Containers are stopped.");
    }

    public static DataSink createStarRocksDataSink(Configuration configuration) {
        return new StarRocksDataSinkFactory().createDataSink(new MockContext(configuration));
    }

    public static void executeSql(String str) {
        try {
            Container.ExecResult execInContainer = STARROCKS_CONTAINER.execInContainer(new String[]{"mysql", "--protocol=TCP", "-uroot", "-P9030", "-h127.0.0.1", "-e " + str});
            if (execInContainer.getExitCode() != 0) {
                throw new RuntimeException("Failed to execute SQL." + execInContainer.getStderr());
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to execute SQL.", e);
        }
    }

    public static boolean checkBackendAvailability() {
        try {
            Container.ExecResult execInContainer = STARROCKS_CONTAINER.execInContainer(new String[]{"mysql", "--protocol=TCP", "-uroot", "-P9030", "-h127.0.0.1", "-e SHOW BACKENDS\\G"});
            if (execInContainer.getExitCode() != 0) {
                return false;
            }
            return execInContainer.getStdout().contains("*************************** 1. row ***************************");
        } catch (Exception e) {
            LOG.info("Failed to check backend status.", e);
            return false;
        }
    }

    public List<String> inspectTableSchema(TableId tableId) throws SQLException {
        ArrayList arrayList = new ArrayList();
        ResultSet executeQuery = STARROCKS_CONTAINER.createConnection(StarRocksContainer.STARROCKS_PASSWORD).createStatement().executeQuery(String.format("DESCRIBE `%s`.`%s`", tableId.getSchemaName(), tableId.getTableName()));
        while (executeQuery.next()) {
            ArrayList arrayList2 = new ArrayList();
            for (int i = DEFAULT_PARALLELISM; i <= 5; i += DEFAULT_PARALLELISM) {
                arrayList2.add(executeQuery.getString(i));
            }
            arrayList.add(String.join(" | ", arrayList2));
        }
        return arrayList;
    }

    public List<String> fetchTableContent(TableId tableId, int i) throws SQLException {
        ArrayList arrayList = new ArrayList();
        ResultSet executeQuery = STARROCKS_CONTAINER.createConnection(StarRocksContainer.STARROCKS_PASSWORD).createStatement().executeQuery(String.format("SELECT * FROM %s.%s", tableId.getSchemaName(), tableId.getTableName()));
        while (executeQuery.next()) {
            ArrayList arrayList2 = new ArrayList();
            for (int i2 = DEFAULT_PARALLELISM; i2 <= i; i2 += DEFAULT_PARALLELISM) {
                arrayList2.add(executeQuery.getString(i2));
            }
            arrayList.add(String.join(" | ", arrayList2));
        }
        return arrayList;
    }

    public static void assertEqualsInAnyOrder(List<String> list, List<String> list2) {
        Assert.assertTrue((list == null || list2 == null) ? false : true);
        assertEqualsInOrder((List) list.stream().sorted().collect(Collectors.toList()), (List) list2.stream().sorted().collect(Collectors.toList()));
    }

    public static void assertEqualsInOrder(List<String> list, List<String> list2) {
        Assert.assertTrue((list == null || list2 == null) ? false : true);
        Assert.assertEquals(list.size(), list2.size());
        Assert.assertArrayEquals(list.toArray(new String[0]), list2.toArray(new String[0]));
    }

    public static void assertMapEquals(Map<String, ?> map, Map<String, ?> map2) {
        Assert.assertTrue((map == null || map2 == null) ? false : true);
        Assert.assertEquals(map.size(), map2.size());
        for (String str : map.keySet()) {
            Assert.assertEquals(map.get(str), map2.get(str));
        }
    }
}
