package au.csiro.pathling.io;

import au.csiro.pathling.QueryHelpers;
import au.csiro.pathling.config.StorageConfiguration;
import au.csiro.pathling.encoders.FhirEncoders;
import au.csiro.pathling.fhir.FhirUtils;
import au.csiro.pathling.io.source.DataSource;
import au.csiro.pathling.security.ResourceAccess;
import au.csiro.pathling.utilities.Preconditions;
import io.delta.tables.DeltaTable;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Enumerations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:au/csiro/pathling/io/Database.class */
public class Database implements DataSource {
    private static final Logger log = LoggerFactory.getLogger(Database.class);

    @Nonnull
    protected final SparkSession spark;

    @Nonnull
    protected final FhirEncoders fhirEncoders;

    @Nonnull
    protected final PersistenceScheme persistence;
    protected final boolean cacheDatasets;

    public Database(@Nonnull SparkSession sparkSession, @Nonnull FhirEncoders fhirEncoders, @Nonnull PersistenceScheme persistenceScheme, boolean z) {
        this.spark = sparkSession;
        this.fhirEncoders = fhirEncoders;
        this.persistence = persistenceScheme;
        this.cacheDatasets = z;
    }

    @Nonnull
    public static Database forConfiguration(@Nonnull SparkSession sparkSession, @Nonnull FhirEncoders fhirEncoders, @Nonnull StorageConfiguration storageConfiguration) {
        return new Database(sparkSession, fhirEncoders, new FileSystemPersistence(sparkSession, FileSystemPersistence.safelyJoinPaths(storageConfiguration.getWarehouseUrl(), storageConfiguration.getDatabaseName())), storageConfiguration.getCacheDatasets().booleanValue());
    }

    public static Database forFileSystem(@Nonnull SparkSession sparkSession, @Nonnull FhirEncoders fhirEncoders, @Nonnull String str, boolean z) {
        return new Database(sparkSession, fhirEncoders, new FileSystemPersistence(sparkSession, str), z);
    }

    public static Database forCatalog(@Nonnull SparkSession sparkSession, @Nonnull FhirEncoders fhirEncoders, @Nonnull Optional<String> optional, boolean z) {
        return new Database(sparkSession, fhirEncoders, new CatalogPersistence(sparkSession, optional), z);
    }

    @Override // au.csiro.pathling.io.source.DataSource
    @ResourceAccess(ResourceAccess.AccessType.READ)
    @Nonnull
    public Dataset<Row> read(@Nullable Enumerations.ResourceType resourceType) {
        return (Dataset) getMaybeNonExistentDeltaTable((Enumerations.ResourceType) Objects.requireNonNull(resourceType)).map((v0) -> {
            return v0.toDF();
        }).orElseGet(() -> {
            return QueryHelpers.createEmptyDataset(this.spark, this.fhirEncoders, resourceType);
        });
    }

    @Override // au.csiro.pathling.io.source.DataSource
    @Nonnull
    public Dataset<Row> read(@Nullable String str) {
        return read(FhirUtils.getResourceType(str));
    }

    @Override // au.csiro.pathling.io.source.DataSource
    @Nonnull
    public Set<Enumerations.ResourceType> getResourceTypes() {
        return this.persistence.list();
    }

    @ResourceAccess(ResourceAccess.AccessType.WRITE)
    public void overwrite(@Nonnull Enumerations.ResourceType resourceType, @Nonnull Dataset<Row> dataset) {
        write(resourceType, dataset);
        this.persistence.invalidate(resourceType);
    }

    @ResourceAccess(ResourceAccess.AccessType.WRITE)
    public void merge(@Nonnull Enumerations.ResourceType resourceType, @Nonnull IBaseResource iBaseResource) {
        merge(resourceType, List.of(iBaseResource));
    }

    @ResourceAccess(ResourceAccess.AccessType.WRITE)
    public void merge(@Nonnull Enumerations.ResourceType resourceType, @Nonnull List<IBaseResource> list) {
        merge(resourceType, this.spark.createDataset(list, this.fhirEncoders.of(resourceType.toCode())).toDF());
    }

    @ResourceAccess(ResourceAccess.AccessType.WRITE)
    public void merge(@Nonnull Enumerations.ResourceType resourceType, @Nonnull Dataset<Row> dataset) {
        DeltaTable readDelta = readDelta(resourceType);
        log.debug("Writing updates: {}", resourceType.toCode());
        this.persistence.merge(resourceType, readDelta.as("original").merge(dataset.as("updates"), "original.id = updates.id").whenMatched().updateAll().whenNotMatched().insertAll());
        this.persistence.invalidate(resourceType);
    }

    @Nonnull
    public static IBaseResource prepareResourceForUpdate(@Nonnull IBaseResource iBaseResource, @Nonnull String str) {
        String idPart = iBaseResource.getIdElement().getIdPart();
        if (idPart.startsWith("urn:uuid:")) {
            iBaseResource.setId(idPart.replaceFirst("urn:uuid:", ""));
        }
        Preconditions.checkUserInput(iBaseResource.getIdElement().getIdPart().equals(str), "Resource ID missing or does not match supplied ID");
        return iBaseResource;
    }

    @Nonnull
    DeltaTable readDelta(@Nonnull Enumerations.ResourceType resourceType) {
        return getMaybeNonExistentDeltaTable(resourceType).orElseGet(() -> {
            writeEmpty(resourceType);
            return getDeltaTable(resourceType);
        });
    }

    private Optional<DeltaTable> getMaybeNonExistentDeltaTable(@Nonnull Enumerations.ResourceType resourceType) {
        return this.persistence.exists(resourceType) ? Optional.of(getDeltaTable(resourceType)) : Optional.empty();
    }

    @Nonnull
    DeltaTable getDeltaTable(@Nonnull Enumerations.ResourceType resourceType) {
        DeltaTable read = this.persistence.read(resourceType);
        if (this.cacheDatasets) {
            log.debug("Caching resource dataset: {}", resourceType.toCode());
            read.toDF().cache();
        }
        return read;
    }

    void write(@Nonnull Enumerations.ResourceType resourceType, @Nonnull Dataset<Row> dataset) {
        log.debug("Overwriting: {}", resourceType.toCode());
        this.persistence.write(resourceType, dataset.orderBy(new Column[]{functions.asc("id")}).write().format("delta").mode(SaveMode.Overwrite).option("overwriteSchema", "true"));
    }

    void writeEmpty(@Nonnull Enumerations.ResourceType resourceType) {
        Dataset<Row> createEmptyDataset = QueryHelpers.createEmptyDataset(this.spark, this.fhirEncoders, resourceType);
        log.debug("Writing empty dataset: {}", resourceType.toCode());
        write(resourceType, createEmptyDataset);
    }
}
