package au.csiro.pathling.io;

import au.csiro.pathling.QueryHelpers;
import au.csiro.pathling.caching.Cacheable;
import au.csiro.pathling.config.Configuration;
import au.csiro.pathling.encoders.FhirEncoders;
import au.csiro.pathling.security.PathlingAuthority;
import au.csiro.pathling.security.ResourceAccess;
import au.csiro.pathling.utilities.Preconditions;
import io.delta.tables.DeltaTable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Enumerations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

@Profile({"(core | import) & !ga4gh"})
@Component
/* loaded from: input_file:au/csiro/pathling/io/Database.class */
public class Database implements Cacheable {
    private static final Logger log = LoggerFactory.getLogger(Database.class);

    @Nonnull
    private Optional<String> cacheKey = buildCacheKeyFromDatabase();

    @Nonnull
    private final String warehouseUrl;

    @Nonnull
    private final String databaseName;

    @Nonnull
    private final Configuration configuration;

    @Nonnull
    protected final SparkSession spark;

    @Nonnull
    protected final FhirEncoders fhirEncoders;

    @Nonnull
    protected final ThreadPoolTaskExecutor executor;

    public Database(@Nonnull Configuration configuration, @Nonnull SparkSession sparkSession, @Nonnull FhirEncoders fhirEncoders, @Nonnull ThreadPoolTaskExecutor threadPoolTaskExecutor) {
        this.configuration = configuration;
        this.spark = sparkSession;
        this.warehouseUrl = PersistenceScheme.convertS3ToS3aUrl(configuration.getStorage().getWarehouseUrl());
        this.databaseName = configuration.getStorage().getDatabaseName();
        this.fhirEncoders = fhirEncoders;
        this.executor = threadPoolTaskExecutor;
    }

    @ResourceAccess(PathlingAuthority.AccessType.READ)
    @Nonnull
    public Dataset<Row> read(@Nonnull Enumerations.ResourceType resourceType) {
        return (Dataset) attemptDeltaLoad(resourceType).map((v0) -> {
            return v0.toDF();
        }).orElseGet(() -> {
            return QueryHelpers.createEmptyDataset(this.spark, this.fhirEncoders, resourceType);
        });
    }

    @ResourceAccess(PathlingAuthority.AccessType.WRITE)
    public void overwrite(@Nonnull Enumerations.ResourceType resourceType, @Nonnull Dataset<Row> dataset) {
        write(resourceType, dataset);
    }

    @ResourceAccess(PathlingAuthority.AccessType.WRITE)
    public void merge(@Nonnull Enumerations.ResourceType resourceType, @Nonnull IBaseResource iBaseResource) {
        merge(resourceType, List.of(iBaseResource));
    }

    @ResourceAccess(PathlingAuthority.AccessType.WRITE)
    public void merge(@Nonnull Enumerations.ResourceType resourceType, @Nonnull List<IBaseResource> list) {
        merge(resourceType, this.spark.createDataset(list, this.fhirEncoders.of(resourceType.toCode())).toDF());
    }

    @ResourceAccess(PathlingAuthority.AccessType.WRITE)
    public void merge(@Nonnull Enumerations.ResourceType resourceType, @Nonnull Dataset<Row> dataset) {
        DeltaTable readDelta = readDelta(resourceType);
        log.debug("Writing updates: {}", resourceType.toCode());
        readDelta.as("original").merge(dataset.as("updates"), "original.id = updates.id").whenMatched().updateAll().whenNotMatched().insertAll().execute();
        invalidateCache(PersistenceScheme.getTableUrl(this.warehouseUrl, this.databaseName, resourceType));
        compact(resourceType, readDelta);
    }

    @Nonnull
    public static IBaseResource prepareResourceForUpdate(@Nonnull IBaseResource iBaseResource, @Nonnull String str) {
        String idPart = iBaseResource.getIdElement().getIdPart();
        if (idPart.startsWith("urn:uuid:")) {
            iBaseResource.setId(idPart.replaceFirst("urn:uuid:", ""));
        }
        Preconditions.checkUserInput(iBaseResource.getIdElement().getIdPart().equals(str), "Resource ID missing or does not match supplied ID");
        return iBaseResource;
    }

    @Override // au.csiro.pathling.caching.Cacheable
    public boolean cacheKeyMatches(@Nonnull String str) {
        return ((Boolean) this.cacheKey.map(str2 -> {
            return Boolean.valueOf(str2.equals(str));
        }).orElse(false)).booleanValue();
    }

    @Nonnull
    DeltaTable readDelta(@Nonnull Enumerations.ResourceType resourceType) {
        return attemptDeltaLoad(resourceType).orElseGet(() -> {
            return getDeltaTable(resourceType, writeEmpty(resourceType));
        });
    }

    private Optional<DeltaTable> attemptDeltaLoad(@Nonnull Enumerations.ResourceType resourceType) {
        String tableUrl = PersistenceScheme.getTableUrl(this.warehouseUrl, this.databaseName, resourceType);
        return DeltaTable.isDeltaTable(this.spark, tableUrl) ? Optional.of(getDeltaTable(resourceType, tableUrl)) : Optional.empty();
    }

    @Nonnull
    private DeltaTable getDeltaTable(@Nonnull Enumerations.ResourceType resourceType, String str) {
        log.info("Loading resource {} from: {}", resourceType.toCode(), str);
        DeltaTable forPath = DeltaTable.forPath(this.spark, str);
        Preconditions.checkNotNull(forPath);
        if (this.configuration.getSpark().getCacheDatasets().booleanValue()) {
            log.debug("Caching resource dataset: {}", resourceType.toCode());
            forPath.toDF().cache();
        }
        return forPath;
    }

    void write(@Nonnull Enumerations.ResourceType resourceType, @Nonnull Dataset<Row> dataset) {
        String tableUrl = PersistenceScheme.getTableUrl(this.warehouseUrl, this.databaseName, resourceType);
        log.debug("Overwriting: {}", tableUrl);
        dataset.orderBy(new Column[]{functions.asc("id")}).write().format("delta").mode(SaveMode.Overwrite).option("overwriteSchema", "true").save(tableUrl);
        invalidateCache(tableUrl);
    }

    @Nonnull
    String writeEmpty(@Nonnull Enumerations.ResourceType resourceType) {
        Dataset<Row> createEmptyDataset = QueryHelpers.createEmptyDataset(this.spark, this.fhirEncoders, resourceType);
        log.debug("Writing empty dataset: {}", resourceType.toCode());
        write(resourceType, createEmptyDataset);
        return PersistenceScheme.getTableUrl(this.warehouseUrl, this.databaseName, resourceType);
    }

    private void compact(@Nonnull Enumerations.ResourceType resourceType, DeltaTable deltaTable) {
        int compactionThreshold = this.configuration.getSpark().getCompactionThreshold();
        int numPartitions = deltaTable.toDF().rdd().getNumPartitions();
        if (numPartitions <= compactionThreshold) {
            log.debug("Compaction not needed (number of partitions: {}, threshold: {})", Integer.valueOf(numPartitions), Integer.valueOf(compactionThreshold));
            return;
        }
        String tableUrl = PersistenceScheme.getTableUrl(this.warehouseUrl, this.databaseName, resourceType);
        log.debug("Scheduling table compaction (number of partitions: {}, threshold: {}): {}", new Object[]{Integer.valueOf(numPartitions), Integer.valueOf(compactionThreshold), tableUrl});
        this.executor.submit(() -> {
            log.debug("Commencing compaction: {}", tableUrl);
            read(resourceType).repartition(new Column[0]).write().option("dataChange", "false").format("delta").mode(SaveMode.Overwrite).save(tableUrl);
            log.debug("Compaction complete: {}", tableUrl);
        });
    }

    private void invalidateCache(String str) {
        this.executor.execute(() -> {
            this.cacheKey = buildCacheKeyFromTable(str);
            this.spark.sqlContext().clearCache();
        });
    }

    private Optional<String> buildCacheKeyFromDatabase() {
        return latestUpdateToDatabase().map(this::cacheKeyFromTimestamp);
    }

    private Optional<Long> latestUpdateToDatabase() {
        String str = this.warehouseUrl + "/" + this.databaseName;
        log.info("Querying latest snapshot from database: {}", str);
        org.apache.hadoop.conf.Configuration hadoopConfiguration = this.spark.sparkContext().hadoopConfiguration();
        Preconditions.checkNotNull(hadoopConfiguration);
        try {
            FileSystem fileSystem = FileSystem.get(new URI(this.warehouseUrl), hadoopConfiguration);
            Preconditions.checkNotNull(fileSystem);
            try {
                fileSystem.exists(new Path(str));
                try {
                    FileStatus[] listStatus = fileSystem.listStatus(new Path(str));
                    Preconditions.checkNotNull(listStatus);
                    List list = (List) Arrays.stream(listStatus).map(fileStatus -> {
                        Path path = fileStatus.getPath();
                        Preconditions.checkNotNull(path);
                        return path.toString();
                    }).filter(str2 -> {
                        return str2.matches("^[^.]+\\.parquet$");
                    }).filter(str3 -> {
                        return DeltaTable.isDeltaTable(this.spark, str3);
                    }).map(this::latestUpdateToTable).filter((v0) -> {
                        return v0.isPresent();
                    }).map((v0) -> {
                        return v0.get();
                    }).collect(Collectors.toList());
                    return list.isEmpty() ? Optional.empty() : Optional.ofNullable((Long) Collections.max(list));
                } catch (IOException e) {
                    log.debug("Unable to access database location, returning empty snapshot time: {}", str);
                    return Optional.empty();
                }
            } catch (IOException e2) {
                log.debug("Unable to access database location, returning empty snapshot time: {}", str);
                return Optional.empty();
            }
        } catch (IOException | URISyntaxException e3) {
            log.debug("Unable to access warehouse location, returning empty snapshot time: {}", this.warehouseUrl);
            return Optional.empty();
        }
    }

    @Nonnull
    private Optional<String> buildCacheKeyFromTable(@Nonnull String str) {
        return latestUpdateToTable(str).map(this::cacheKeyFromTimestamp);
    }

    @Nonnull
    private Optional<Long> latestUpdateToTable(@Nonnull String str) {
        log.debug("Querying latest snapshot for table: {}", str);
        Row[] rowArr = (Row[]) DeltaTable.forPath(this.spark, str).history().orderBy(new Column[]{functions.desc("version")}).select("timestamp", new String[0]).head(1);
        return rowArr.length != 1 ? Optional.empty() : Optional.of(Long.valueOf(rowArr[0].getTimestamp(0).getTime()));
    }

    @Nonnull
    private String cacheKeyFromTimestamp(@Nonnull Long l) {
        return Long.toString(l.longValue(), 36);
    }

    @Override // au.csiro.pathling.caching.Cacheable
    @Nonnull
    public Optional<String> getCacheKey() {
        return this.cacheKey;
    }
}
