package org.apache.hudi.utilities.sources;

import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase;
import org.apache.spark.sql.Dataset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/hudi/utilities/sources/TestJsonDFSSource.class */
public class TestJsonDFSSource extends AbstractDFSSourceTestBase {
    @Override // org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase, org.apache.hudi.utilities.testutils.UtilitiesTestBase
    @BeforeEach
    public void setup() throws Exception {
        super.setup();
        this.dfsRoot = basePath + "/jsonFiles";
        this.fileSuffix = ".json";
    }

    @Override // org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase
    public Source prepareDFSSource(TypedProperties typedProperties) {
        typedProperties.setProperty("hoodie.streamer.source.dfs.root", this.dfsRoot);
        return new JsonDFSSource(typedProperties, jsc, sparkSession, this.schemaProvider);
    }

    @Override // org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase
    public void writeNewDataToFile(List<HoodieRecord> list, Path path) throws IOException {
        UtilitiesTestBase.Helpers.saveStringsToDFS(UtilitiesTestBase.Helpers.jsonifyRecords(list), storage, path.toString());
    }

    @Test
    public void testCorruptedSourceFile() throws IOException {
        fs.mkdirs(new Path(this.dfsRoot));
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty(HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS.key(), "true");
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(prepareDFSSource(typedProperties), Option.empty(), Option.of(typedProperties));
        generateOneFile("1", "000", 10);
        generateOneFile("2", "000", 10);
        FileStatus fileStatus = (FileStatus) fs.listFiles(generateOneFile("3", "000", 10), true).next();
        InputBatch fetchNewDataInRowFormat = sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
        corruptFile(fileStatus.getPath());
        Assertions.assertTrue(fetchNewDataInRowFormat.getBatch().isPresent());
        Throwable assertThrows = Assertions.assertThrows(Exception.class, () -> {
            ((Dataset) fetchNewDataInRowFormat.getBatch().get()).show(30);
        });
        while (true) {
            Throwable th = assertThrows;
            if (th == null) {
                throw new AssertionError("Exception does not have SchemaCompatibility in its trace", th);
            }
            if (th instanceof SchemaCompatibilityException) {
                return;
            } else {
                assertThrows = th.getCause();
            }
        }
    }

    protected void corruptFile(Path path) throws IOException {
        PrintStream printStream = new PrintStream((OutputStream) fs.appendFile(path).build());
        printStream.println("��\u200d");
        printStream.flush();
        printStream.close();
    }
}
