package org.apache.hudi.utilities.multitable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLayoutConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hudi.utilities.multitable.HoodieMultiTableServicesMain;
import org.apache.hudi.utilities.testutils.JdbcTestUtils;
import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/multitable/TestHoodieMultiTableServicesMain.class */
class TestHoodieMultiTableServicesMain extends HoodieCommonTestHarness implements SparkProvider {
    private static final Logger LOG = LoggerFactory.getLogger(TestHoodieMultiTableServicesMain.class);
    private static SparkSession spark;
    private static SQLContext sqlContext;
    private static JavaSparkContext jsc;
    private static HoodieSparkEngineContext context;
    protected boolean initialized = false;
    protected transient HoodieTestDataGenerator dataGen = null;

    TestHoodieMultiTableServicesMain() {
    }

    @BeforeEach
    public void init() throws IOException, ExecutionException, InterruptedException {
        if (!(spark != null)) {
            SparkConf conf = conf();
            HoodieSparkKryoRegistrar$.MODULE$.register(conf);
            SparkRDDReadClient.addHoodieSupport(conf);
            spark = SparkSession.builder().config(conf).getOrCreate();
            sqlContext = spark.sqlContext();
            jsc = new JavaSparkContext(spark.sparkContext());
            context = new HoodieSparkEngineContext(jsc);
        }
        initPath();
        prepareData();
    }

    @Test
    public void testRunAllServices() throws IOException, ExecutionException, InterruptedException {
        HoodieMultiTableServicesMain.Config hoodieMultiServiceConfig = getHoodieMultiServiceConfig();
        hoodieMultiServiceConfig.batch = true;
        HoodieTableMetaClient metaClient = getMetaClient("table1");
        HoodieTableMetaClient metaClient2 = getMetaClient("table2");
        new HoodieMultiTableServicesMain(jsc, hoodieMultiServiceConfig).startServices();
        Assertions.assertEquals(1, metaClient.reloadActiveTimeline().getCleanerTimeline().countInstants());
        Assertions.assertEquals(1, metaClient2.reloadActiveTimeline().getCleanerTimeline().countInstants());
        Assertions.assertEquals(2, metaClient.reloadActiveTimeline().getDeltaCommitTimeline().countInstants());
        Assertions.assertEquals(2, metaClient2.reloadActiveTimeline().getDeltaCommitTimeline().countInstants());
        Assertions.assertEquals(1, metaClient.reloadActiveTimeline().getCompletedReplaceTimeline().countInstants());
        Assertions.assertEquals(1, metaClient2.reloadActiveTimeline().getCompletedReplaceTimeline().countInstants());
        Assertions.assertEquals(4, metaClient.reloadActiveTimeline().getCommitsTimeline().countInstants());
        Assertions.assertEquals(4, metaClient2.reloadActiveTimeline().getCommitsTimeline().countInstants());
    }

    @Test
    public void testRunAllServicesForSingleTable() throws IOException, ExecutionException, InterruptedException {
        HoodieMultiTableServicesMain.Config hoodieMultiServiceConfig = getHoodieMultiServiceConfig();
        HoodieTableMetaClient metaClient = getMetaClient("table1");
        hoodieMultiServiceConfig.batch = true;
        hoodieMultiServiceConfig.basePath = Collections.singletonList(metaClient.getBasePath().toString());
        new HoodieMultiTableServicesMain(jsc, hoodieMultiServiceConfig).startServices();
        Assertions.assertEquals(1, metaClient.reloadActiveTimeline().getCleanerTimeline().countInstants());
        Assertions.assertEquals(2, metaClient.reloadActiveTimeline().getDeltaCommitTimeline().countInstants());
        Assertions.assertEquals(1, metaClient.reloadActiveTimeline().getCompletedReplaceTimeline().countInstants());
        Assertions.assertEquals(4, metaClient.reloadActiveTimeline().getCommitsTimeline().countInstants());
    }

    @Test
    public void testStreamRunAllServices() throws IOException, ExecutionException, InterruptedException {
        HoodieMultiTableServicesMain hoodieMultiTableServicesMain = new HoodieMultiTableServicesMain(jsc, getHoodieMultiServiceConfig());
        new Thread(() -> {
            try {
                Thread.sleep(10000L);
                LOG.info("Shutdown the table services");
                hoodieMultiTableServicesMain.cancel();
            } catch (InterruptedException e) {
                LOG.warn("InterruptedException: ", e);
            }
        }).start();
        hoodieMultiTableServicesMain.startServices();
        HoodieTableMetaClient metaClient = getMetaClient("table1");
        HoodieTableMetaClient metaClient2 = getMetaClient("table2");
        Assertions.assertEquals(1, metaClient.reloadActiveTimeline().getCleanerTimeline().countInstants());
        Assertions.assertEquals(1, metaClient2.reloadActiveTimeline().getCleanerTimeline().countInstants());
        Assertions.assertEquals(4, metaClient.reloadActiveTimeline().getCommitsTimeline().countInstants());
        Assertions.assertEquals(4, metaClient2.reloadActiveTimeline().getCommitsTimeline().countInstants());
    }

    @Test
    public void testRunMultiTableServicesWithOneWrongPath() throws IOException {
        HoodieMultiTableServicesMain.Config hoodieMultiServiceConfig = getHoodieMultiServiceConfig();
        hoodieMultiServiceConfig.autoDiscovery = false;
        hoodieMultiServiceConfig.batch = true;
        HoodieTableMetaClient metaClient = getMetaClient("table1");
        hoodieMultiServiceConfig.configs.add(String.format("%s=%s", "hoodie.tableservice.skipNonHudiTable", "true"));
        hoodieMultiServiceConfig.configs.add(String.format("%s=%s", "hoodie.tableservice.tablesToServe", metaClient.getBasePath() + ",file:///fakepath"));
        HoodieMultiTableServicesMain hoodieMultiTableServicesMain = new HoodieMultiTableServicesMain(jsc, hoodieMultiServiceConfig);
        try {
            hoodieMultiTableServicesMain.startServices();
        } catch (Exception e) {
            Assertions.assertFalse(e instanceof TableNotFoundException);
        }
        hoodieMultiServiceConfig.batch = false;
        new Thread(() -> {
            try {
                Thread.sleep(10000L);
                LOG.info("Shutdown the table services");
                hoodieMultiTableServicesMain.cancel();
            } catch (InterruptedException e2) {
                LOG.warn("InterruptedException: ", e2);
            }
        }).start();
        try {
            hoodieMultiTableServicesMain.startServices();
        } catch (Exception e2) {
            Assertions.assertFalse(e2 instanceof TableNotFoundException);
        }
        hoodieMultiServiceConfig.batch = true;
        hoodieMultiServiceConfig.configs.add(String.format("%s=%s", "hoodie.tableservice.skipNonHudiTable", "false"));
        try {
            hoodieMultiTableServicesMain.startServices();
        } catch (Exception e3) {
            Assertions.assertTrue(e3 instanceof TableNotFoundException);
        }
    }

    private void prepareData() throws IOException {
        initTestDataGenerator();
        HoodieTableMetaClient metaClient = getMetaClient("table1");
        HoodieTableMetaClient metaClient2 = getMetaClient("table2");
        String createNewInstantTime = InProcessTimeGenerator.createNewInstantTime(0L);
        writeToTable(metaClient.getBasePath(), createNewInstantTime, false);
        writeToTable(metaClient2.getBasePath(), createNewInstantTime, false);
        String createNewInstantTime2 = InProcessTimeGenerator.createNewInstantTime(1L);
        writeToTable(metaClient.getBasePath(), createNewInstantTime2, true);
        writeToTable(metaClient2.getBasePath(), createNewInstantTime2, true);
        Assertions.assertEquals(0, metaClient.reloadActiveTimeline().getCleanerTimeline().countInstants());
        Assertions.assertEquals(0, metaClient2.reloadActiveTimeline().getCleanerTimeline().countInstants());
    }

    private void writeToTable(StoragePath storagePath, String str, boolean z) throws IOException {
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(context, getWriteConfigBuilder(storagePath, JdbcTestUtils.JDBC_USER).build());
        sparkRDDWriteClient.startCommitWithTime(str);
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(jsc.parallelize(z ? this.dataGen.generateUpdates(str, 100) : this.dataGen.generateInserts(str, 100), 8), str).collect());
    }

    private HoodieWriteConfig.Builder getWriteConfigBuilder(StoragePath storagePath, String str) {
        new Properties().setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
        return HoodieWriteConfig.newBuilder().withPath(storagePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(4, 4).withBulkInsertParallelism(4).withFinalizeWriteParallelism(2).withProps(makeIndexConfig(HoodieIndex.IndexType.BUCKET)).withTableServicesEnabled(false).withLayoutConfig(HoodieLayoutConfig.newBuilder().withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()).withLayoutPartitioner("org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner").build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).forTable(str);
    }

    protected HoodieTableMetaClient getMetaClient(String str) throws IOException {
        String str2 = "file://" + this.tempDir.toAbsolutePath() + "/" + str;
        Path path = new Path(str2);
        path.getFileSystem(jsc.hadoopConfiguration()).mkdirs(path);
        Properties properties = new Properties();
        properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
        properties.setProperty(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "_row_key");
        return HoodieTestUtils.init(HadoopFSUtils.getStorageConf(jsc.hadoopConfiguration()), str2, getTableType(), properties);
    }

    private Properties makeIndexConfig(HoodieIndex.IndexType indexType) {
        Properties properties = new Properties();
        HoodieIndexConfig.Builder withIndexType = HoodieIndexConfig.newBuilder().withIndexType(indexType);
        if (indexType.equals(HoodieIndex.IndexType.BUCKET)) {
            properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
            withIndexType.fromProperties(properties).withIndexKeyField("_row_key").withBucketNum("1").withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE);
            properties.putAll(withIndexType.build().getProps());
            properties.putAll(HoodieLayoutConfig.newBuilder().fromProperties(properties).withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()).withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build().getProps());
        }
        return properties;
    }

    protected HoodieTableType getTableType() {
        return HoodieTableType.MERGE_ON_READ;
    }

    private HoodieMultiTableServicesMain.Config getHoodieMultiServiceConfig() {
        HoodieMultiTableServicesMain.Config config = new HoodieMultiTableServicesMain.Config();
        config.autoDiscovery = true;
        config.enableCompaction = true;
        config.enableClustering = true;
        config.enableClean = true;
        config.enableArchive = true;
        ArrayList arrayList = new ArrayList();
        arrayList.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_POLICY.key(), HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS));
        arrayList.add(String.format("%s=%s", HoodieCleanConfig.AUTO_CLEAN.key(), "false"));
        arrayList.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.key(), "1"));
        arrayList.add(String.format("%s=%s", HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "0"));
        config.configs = arrayList;
        config.compactionRunningMode = "scheduleandexecute";
        config.compactionStrategyClassName = LogFileSizeBasedCompactionStrategy.class.getName();
        config.clusteringRunningMode = "scheduleandexecute";
        config.basePath = Collections.singletonList(this.tempDir.toAbsolutePath().toString());
        config.scheduleDelay = 50000;
        return config;
    }

    protected void initTestDataGenerator() {
        this.dataGen = new HoodieTestDataGenerator();
    }

    public HoodieEngineContext context() {
        return context;
    }

    public SparkSession spark() {
        return spark;
    }

    public SQLContext sqlContext() {
        return sqlContext;
    }

    public JavaSparkContext jsc() {
        return jsc;
    }

    @AfterAll
    public static synchronized void cleanUpAfterAll() {
        if (spark != null) {
            spark.close();
            spark = null;
        }
    }
}
