package au.csiro.pathling.library.io;

import au.csiro.pathling.encoders.FhirEncoders;
import au.csiro.pathling.io.ImportMode;
import au.csiro.pathling.library.PathlingContext;
import au.csiro.pathling.library.TestHelpers;
import au.csiro.pathling.library.io.source.DatasetSource;
import au.csiro.pathling.library.io.source.DeltaSource;
import au.csiro.pathling.library.io.source.NdjsonSource;
import au.csiro.pathling.library.io.source.ParquetSource;
import au.csiro.pathling.library.io.source.QueryableDataSource;
import au.csiro.pathling.terminology.TerminologyServiceFactory;
import au.csiro.pathling.test.assertions.DatasetAssert;
import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Collections;
import java.util.Set;
import java.util.function.Function;
import org.apache.commons.io.FileUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.hl7.fhir.r4.model.Enumerations;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:au/csiro/pathling/library/io/DataSourcesTest.class */
class DataSourcesTest {
    private static final Logger log = LoggerFactory.getLogger(DataSourcesTest.class);
    static final Path TEST_DATA_PATH = Path.of("src/test/resources/test-data", new String[0]).toAbsolutePath().normalize();
    static PathlingContext pathlingContext;
    static SparkSession spark;
    static Path temporaryDirectory;

    DataSourcesTest() {
    }

    @BeforeAll
    public static void setupContext() throws IOException {
        temporaryDirectory = Files.createTempDirectory("pathling-datasources-test", new FileAttribute[0]);
        log.info("Created temporary directory: {}", temporaryDirectory);
        spark = TestHelpers.sparkBuilder().config("spark.sql.catalogImplementation", "hive").config("spark.sql.warehouse.dir", temporaryDirectory.resolve("spark-warehouse").toString()).getOrCreate();
        spark.sql("CREATE DATABASE IF NOT EXISTS test");
        pathlingContext = PathlingContext.create(spark, FhirEncoders.forR4().getOrCreate(), (TerminologyServiceFactory) Mockito.mock(TerminologyServiceFactory.class, Mockito.withSettings().serializable()));
    }

    @AfterAll
    public static void tearDownAll() throws IOException {
        spark.stop();
        FileUtils.deleteDirectory(temporaryDirectory.toFile());
    }

    @Test
    void ndjsonReadWrite() {
        NdjsonSource ndjson = pathlingContext.read().ndjson(TEST_DATA_PATH.resolve("ndjson").toString());
        queryNdjsonData(ndjson);
        ndjson.write().ndjson(temporaryDirectory.resolve("ndjson").toString(), "error");
        queryNdjsonData(pathlingContext.read().ndjson(temporaryDirectory.resolve("ndjson").toString()));
    }

    @Test
    void ndjsonWithExtension() {
        queryNdjsonData(pathlingContext.read().ndjson(TEST_DATA_PATH.resolve("jsonl").toString(), "jsonl"));
    }

    @Test
    void ndjsonReadQualified() {
        queryNdjsonData(pathlingContext.read().ndjson(TEST_DATA_PATH.resolve("ndjson-qualified").toString()));
    }

    @Test
    void ndjsonReadWriteCustom() {
        Function function = str -> {
            return Collections.singleton(str.replaceFirst("Custom", ""));
        };
        NdjsonSource ndjson = pathlingContext.read().ndjson(TEST_DATA_PATH.resolve("ndjson-custom").toString(), "ndjson", function);
        queryNdjsonData(ndjson);
        ndjson.write().ndjson(temporaryDirectory.resolve("ndjson-custom").toString(), "error", str2 -> {
            return str2.replaceFirst("Custom", "");
        });
        queryNdjsonData(pathlingContext.read().ndjson(temporaryDirectory.resolve("ndjson-custom").toString(), "ndjson", function));
    }

    @Test
    void ndjsonWithExtract() {
        extractNdjsonData(pathlingContext.read().ndjson(TEST_DATA_PATH.resolve("ndjson").toString()));
    }

    @Test
    void bundlesRead() {
        queryBundlesData(pathlingContext.read().bundles(TEST_DATA_PATH.resolve("bundles").toString(), Set.of("Patient", "Condition"), "application/fhir+json"));
    }

    @Test
    void datasetsRead() {
        Dataset load = spark.read().format("delta").load(TEST_DATA_PATH.resolve("delta").resolve("Condition.parquet").toString());
        queryDeltaData(pathlingContext.read().datasets().dataset(Enumerations.ResourceType.CONDITION, load).dataset(Enumerations.ResourceType.PATIENT, spark.read().format("delta").load(TEST_DATA_PATH.resolve("delta").resolve("Patient.parquet").toString())));
    }

    @Test
    void parquetReadWrite() {
        ParquetSource parquet = pathlingContext.read().parquet(TEST_DATA_PATH.resolve("parquet").toString());
        queryParquetData(parquet);
        parquet.write().parquet(temporaryDirectory.resolve("parquet").toString(), "error");
        queryParquetData(pathlingContext.read().parquet(temporaryDirectory.resolve("parquet").toString()));
    }

    @Test
    void deltaReadWrite() {
        DeltaSource delta = pathlingContext.read().delta(TEST_DATA_PATH.resolve("delta").toString());
        queryDeltaData(delta);
        delta.write().delta(temporaryDirectory.resolve("delta").toString());
        queryDeltaData(pathlingContext.read().delta(temporaryDirectory.resolve("delta").toString()));
    }

    @Test
    void deltaReadWriteWithMerge() {
        DeltaSource delta = pathlingContext.read().delta(TEST_DATA_PATH.resolve("delta").toString());
        queryDeltaData(delta);
        delta.write().delta(temporaryDirectory.resolve("delta").toString(), ImportMode.MERGE.getCode());
        queryDeltaData(pathlingContext.read().delta(temporaryDirectory.resolve("delta").toString()));
    }

    @Test
    void tablesReadWrite() {
        NdjsonSource ndjson = pathlingContext.read().ndjson(TEST_DATA_PATH.resolve("ndjson").toString());
        queryNdjsonData(ndjson);
        ndjson.write().tables();
        queryNdjsonData(pathlingContext.read().tables());
    }

    @Test
    void tablesReadWriteWithImportMode() {
        pathlingContext.read().ndjson(TEST_DATA_PATH.resolve("ndjson").toString()).write().tables(ImportMode.MERGE.getCode());
        queryNdjsonData(pathlingContext.read().tables());
    }

    @Test
    void tablesReadWriteWithImportModeAndSchema() {
        pathlingContext.read().ndjson(TEST_DATA_PATH.resolve("ndjson").toString()).write().tables(ImportMode.OVERWRITE.getCode(), "test");
        queryNdjsonData(pathlingContext.read().tables("test"));
    }

    @Test
    void readNonExistentResource() {
        DatasetSource datasets = pathlingContext.read().datasets();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            datasets.read(Enumerations.ResourceType.PATIENT);
        });
    }

    @Test
    void readInvalidUri() {
        Assertions.assertTrue(((RuntimeException) Assertions.assertThrows(RuntimeException.class, () -> {
            pathlingContext.read().ndjson("file:\\\\non-existent");
        })).getCause() instanceof URISyntaxException);
    }

    private static void queryNdjsonData(@Nonnull QueryableDataSource queryableDataSource) {
        Assertions.assertEquals(2, queryableDataSource.getResourceTypes().size());
        Assertions.assertTrue(queryableDataSource.getResourceTypes().contains(Enumerations.ResourceType.PATIENT));
        Assertions.assertTrue(queryableDataSource.getResourceTypes().contains(Enumerations.ResourceType.CONDITION));
        DatasetAssert.of(queryableDataSource.aggregate(Enumerations.ResourceType.PATIENT).aggregation("count()").grouping("gender").execute()).hasRows(new Row[]{RowFactory.create(new Object[]{"female", 4}), RowFactory.create(new Object[]{"male", 5})});
        DatasetAssert.of(queryableDataSource.aggregate(Enumerations.ResourceType.CONDITION).aggregation("count()").execute()).hasRows(new Row[]{RowFactory.create(new Object[]{71})});
    }

    private static void queryBundlesData(@Nonnull QueryableDataSource queryableDataSource) {
        Assertions.assertEquals(2, queryableDataSource.getResourceTypes().size());
        Assertions.assertTrue(queryableDataSource.getResourceTypes().contains(Enumerations.ResourceType.PATIENT));
        Assertions.assertTrue(queryableDataSource.getResourceTypes().contains(Enumerations.ResourceType.CONDITION));
        DatasetAssert.of(queryableDataSource.aggregate(Enumerations.ResourceType.PATIENT).aggregation("count()").filter("gender = 'female'").execute()).hasRows(new Row[]{RowFactory.create(new Object[]{6})});
        DatasetAssert.of(queryableDataSource.aggregate(Enumerations.ResourceType.CONDITION).aggregation("count()").execute()).hasRows(new Row[]{RowFactory.create(new Object[]{246})});
    }

    private static void queryDeltaData(@Nonnull QueryableDataSource queryableDataSource) {
        queryNdjsonData(queryableDataSource);
    }

    private static void queryParquetData(@Nonnull QueryableDataSource queryableDataSource) {
        queryNdjsonData(queryableDataSource);
    }

    private static void extractNdjsonData(@Nonnull QueryableDataSource queryableDataSource) {
        DatasetAssert.of(queryableDataSource.extract(Enumerations.ResourceType.PATIENT).column("id", "Patient ID").column("gender").column("address.postalCode").filter("id = 'beff242e-580b-47c0-9844-c1a68c36c5bf'").limit(1).execute()).hasRows(new Row[]{RowFactory.create(new Object[]{"beff242e-580b-47c0-9844-c1a68c36c5bf", "male", "02138"})});
    }

    @Test
    void testS3Uri() {
        Exception exc = (Exception) Assertions.assertThrows(RuntimeException.class, () -> {
            pathlingContext.read().ndjson("s3://pathling-test-data/ndjson/");
        });
        Assertions.assertTrue(exc.getCause() instanceof ClassNotFoundException);
        Assertions.assertEquals("Class org.apache.hadoop.fs.s3a.S3AFileSystem not found", exc.getCause().getMessage());
    }
}
