package org.apache.hudi.utilities.sources;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.UtilitiesTestBase;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.class */
public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase {
    FilebasedSchemaProvider schemaProvider;
    String dfsRoot;
    String fileSuffix;
    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();

    @BeforeClass
    public static void initClass() throws Exception {
        UtilitiesTestBase.initClass();
    }

    @AfterClass
    public static void cleanupClass() throws Exception {
        UtilitiesTestBase.cleanupClass();
    }

    @Override // org.apache.hudi.utilities.UtilitiesTestBase
    @Before
    public void setup() throws Exception {
        super.setup();
        this.schemaProvider = new FilebasedSchemaProvider(UtilitiesTestBase.Helpers.setupSchemaOnDFS(), this.jsc);
    }

    @Override // org.apache.hudi.utilities.UtilitiesTestBase
    @After
    public void teardown() throws Exception {
        super.teardown();
    }

    abstract Source prepareDFSSource();

    abstract void writeNewDataToFile(List<HoodieRecord> list, Path path) throws IOException;

    Path generateOneFile(String str, String str2, int i) throws IOException {
        Path path = new Path(this.dfsRoot, str + this.fileSuffix);
        writeNewDataToFile(this.dataGenerator.generateInserts(str2, Integer.valueOf(i)), path);
        return path;
    }

    @Test
    public void testReadingFromSource() throws IOException {
        dfs.mkdirs(new Path(this.dfsRoot));
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(prepareDFSSource());
        Assert.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        Assert.assertTrue(((FileStatus) dfs.listFiles(generateOneFile("1", "000", 100), true).next()).getLen() > ((long) 10));
        Assert.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 10).getBatch());
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assert.assertEquals(100L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        InputBatch fetchNewDataInRowFormat = sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
        Assert.assertEquals(100L, ((Dataset) fetchNewDataInRowFormat.getBatch().get()).count());
        Assert.assertEquals(100L, AvroConversionUtils.createDataFrame(JavaRDD.toRDD((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()), this.schemaProvider.getSourceSchema().toString(), this.sparkSession).count());
        generateOneFile("2", "001", 10000);
        InputBatch fetchNewDataInAvroFormat2 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assert.assertEquals(10000L, ((JavaRDD) fetchNewDataInAvroFormat2.getBatch().get()).count());
        InputBatch fetchNewDataInRowFormat2 = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assert.assertEquals(10000L, ((Dataset) fetchNewDataInRowFormat2.getBatch().get()).count());
        InputBatch fetchNewDataInRowFormat3 = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assert.assertEquals(10000L, ((Dataset) fetchNewDataInRowFormat3.getBatch().get()).count());
        Assert.assertEquals(fetchNewDataInRowFormat2.getCheckpointForNextBatch(), fetchNewDataInRowFormat3.getCheckpointForNextBatch());
        ((Dataset) fetchNewDataInRowFormat3.getBatch().get()).createOrReplaceTempView("test_dfs_table");
        Assert.assertEquals(10000L, SparkSession.builder().sparkContext(this.jsc.sc()).getOrCreate().sql("select * from test_dfs_table").count());
        Assert.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat2.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
        Assert.assertEquals(10100L, ((JavaRDD) sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch().get()).count());
    }
}
