package org.apache.hudi.utilities.sources;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/hudi/utilities/sources/TestSqlSource.class */
public class TestSqlSource extends UtilitiesTestBase {
    protected FilebasedSchemaProvider schemaProvider;
    private String dfsRoot;
    private TypedProperties props;
    private SqlSource sqlSource;
    private SourceFormatAdapter sourceFormatAdapter;
    private final boolean useFlattenedSchema = false;
    private final String sqlSourceConfig = "hoodie.streamer.source.sql.sql.query";
    protected HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();

    @BeforeAll
    public static void initClass() throws Exception {
        UtilitiesTestBase.initTestServices();
    }

    @AfterAll
    public static void cleanupClass() throws IOException {
        UtilitiesTestBase.cleanUpUtilitiesTestServices();
    }

    @Override // org.apache.hudi.utilities.testutils.UtilitiesTestBase
    @BeforeEach
    public void setup() throws Exception {
        this.dfsRoot = UtilitiesTestBase.basePath + "/parquetFiles";
        UtilitiesTestBase.fs.mkdirs(new Path(this.dfsRoot));
        this.props = new TypedProperties();
        super.setup();
        this.schemaProvider = new FilebasedSchemaProvider(UtilitiesTestBase.Helpers.setupSchemaOnDFS(), jsc);
        generateTestTable("1", "001", 10000);
    }

    @Override // org.apache.hudi.utilities.testutils.UtilitiesTestBase
    @AfterEach
    public void teardown() throws Exception {
        super.teardown();
    }

    private void generateTestTable(String str, String str2, int i) throws IOException {
        UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(this.dataGenerator.generateInserts(str2, Integer.valueOf(i), false)), new Path(this.dfsRoot, str));
        sparkSession.read().parquet(this.dfsRoot).createOrReplaceTempView("test_sql_table");
    }

    @Test
    public void testSqlSourceAvroFormat() throws IOException {
        this.props.setProperty("hoodie.streamer.source.sql.sql.query", "select * from test_sql_table");
        this.sqlSource = new SqlSource(this.props, jsc, sparkSession, this.schemaProvider);
        this.sourceFormatAdapter = new SourceFormatAdapter(this.sqlSource);
        Assertions.assertEquals(10000L, AvroConversionUtils.createDataFrame(JavaRDD.toRDD((JavaRDD) this.sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch().get()), this.schemaProvider.getSourceSchema().toString(), sparkSession).count());
    }

    @Test
    public void testSqlSourceRowFormat() throws IOException {
        this.props.setProperty("hoodie.streamer.source.sql.sql.query", "select * from test_sql_table");
        this.sqlSource = new SqlSource(this.props, jsc, sparkSession, this.schemaProvider);
        this.sourceFormatAdapter = new SourceFormatAdapter(this.sqlSource);
        Assertions.assertEquals(10000L, ((Dataset) this.sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get()).count());
    }

    @Test
    public void testSqlSourceCheckpoint() throws IOException {
        this.props.setProperty("hoodie.streamer.source.sql.sql.query", "select * from test_sql_table where 1=0");
        this.sqlSource = new SqlSource(this.props, jsc, sparkSession, this.schemaProvider);
        this.sourceFormatAdapter = new SourceFormatAdapter(this.sqlSource);
        Assertions.assertNull(this.sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getCheckpointForNextBatch());
    }

    @Test
    public void testSqlSourceMoreRecordsThanSourceLimit() throws IOException {
        this.props.setProperty("hoodie.streamer.source.sql.sql.query", "select * from test_sql_table");
        this.sqlSource = new SqlSource(this.props, jsc, sparkSession, this.schemaProvider);
        this.sourceFormatAdapter = new SourceFormatAdapter(this.sqlSource);
        Assertions.assertEquals(10000L, ((Dataset) this.sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), 1000L).getBatch().get()).count());
    }

    @Test
    public void testSqlSourceZeroRecord() throws IOException {
        this.props.setProperty("hoodie.streamer.source.sql.sql.query", "select * from test_sql_table where 1=0");
        this.sqlSource = new SqlSource(this.props, jsc, sparkSession, this.schemaProvider);
        this.sourceFormatAdapter = new SourceFormatAdapter(this.sqlSource);
        Assertions.assertEquals(0L, ((Dataset) this.sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get()).count());
    }

    @Test
    public void testSqlSourceInvalidTable() throws IOException {
        this.props.setProperty("hoodie.streamer.source.sql.sql.query", "select * from not_exist_sql_table");
        this.sqlSource = new SqlSource(this.props, jsc, sparkSession, this.schemaProvider);
        this.sourceFormatAdapter = new SourceFormatAdapter(this.sqlSource);
        Assertions.assertThrows(AnalysisException.class, () -> {
            this.sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
        });
    }
}
