package au.csiro.pathling.io;

import au.csiro.pathling.caching.Cacheable;
import io.delta.tables.DeltaTable;
import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.hl7.fhir.r4.model.Enumerations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/* loaded from: input_file:au/csiro/pathling/io/CacheableFileSystemPersistence.class */
public class CacheableFileSystemPersistence extends FileSystemPersistence implements Cacheable {
    private static final Logger log = LoggerFactory.getLogger(CacheableFileSystemPersistence.class);

    @Nonnull
    private final ThreadPoolTaskExecutor executor;
    private final int compactionThreshold;

    @Nonnull
    private Optional<String> cacheKey;

    public CacheableFileSystemPersistence(@Nonnull SparkSession sparkSession, @Nonnull String str, @Nonnull ThreadPoolTaskExecutor threadPoolTaskExecutor, int i) {
        super(sparkSession, str);
        this.executor = threadPoolTaskExecutor;
        this.compactionThreshold = i;
        this.cacheKey = buildCacheKeyFromDatabase();
    }

    public void invalidate(@Nonnull Enumerations.ResourceType resourceType) {
        super.invalidate(resourceType);
        invalidateCache(resourceType);
        compact(resourceType);
    }

    @Override // au.csiro.pathling.caching.Cacheable
    public boolean cacheKeyMatches(@Nonnull String str) {
        return ((Boolean) this.cacheKey.map(str2 -> {
            return Boolean.valueOf(str2.equals(str));
        }).orElse(false)).booleanValue();
    }

    @Nonnull
    private Optional<Long> latestUpdate() {
        log.info("Querying latest snapshot from database: {}", this.path);
        Configuration hadoopConfiguration = this.spark.sparkContext().hadoopConfiguration();
        Objects.requireNonNull(hadoopConfiguration);
        try {
            FileSystem fileSystem = FileSystem.get(new URI(this.path), hadoopConfiguration);
            Objects.requireNonNull(fileSystem);
            try {
                fileSystem.exists(new Path(this.path));
                try {
                    FileStatus[] listStatus = fileSystem.listStatus(new Path(this.path));
                    Objects.requireNonNull(listStatus);
                    List list = (List) Arrays.stream(listStatus).map(fileStatus -> {
                        Path path = fileStatus.getPath();
                        Objects.requireNonNull(path);
                        return path.toString();
                    }).filter(str -> {
                        return str.matches("^[^.]+\\.parquet$");
                    }).map(str2 -> {
                        return DeltaTable.forPath(this.spark, str2);
                    }).map(CacheableFileSystemPersistence::latestUpdateToTable).filter((v0) -> {
                        return v0.isPresent();
                    }).map((v0) -> {
                        return v0.get();
                    }).collect(Collectors.toList());
                    return list.isEmpty() ? Optional.empty() : Optional.ofNullable((Long) Collections.max(list));
                } catch (IOException e) {
                    log.debug("Unable to access database location, returning empty snapshot time: {}", this.path);
                    return Optional.empty();
                }
            } catch (IOException e2) {
                log.debug("Unable to access database location, returning empty snapshot time: {}", this.path);
                return Optional.empty();
            }
        } catch (IOException | URISyntaxException e3) {
            log.debug("Unable to access warehouse location, returning empty snapshot time: {}", this.path);
            return Optional.empty();
        }
    }

    @Nonnull
    private static Optional<Long> latestUpdateToTable(@Nonnull DeltaTable deltaTable) {
        Row[] rowArr = (Row[]) deltaTable.history().orderBy(new Column[]{functions.desc("version")}).select("timestamp", new String[0]).head(1);
        return rowArr.length != 1 ? Optional.empty() : Optional.of(Long.valueOf(rowArr[0].getTimestamp(0).getTime()));
    }

    private void invalidateCache(@Nonnull Enumerations.ResourceType resourceType) {
        this.executor.execute(() -> {
            this.cacheKey = buildCacheKeyFromTable(read(resourceType));
            this.spark.sqlContext().clearCache();
        });
    }

    @Nonnull
    private Optional<String> buildCacheKeyFromTable(@Nonnull DeltaTable deltaTable) {
        return latestUpdateToTable(deltaTable).map(this::cacheKeyFromTimestamp);
    }

    @Nonnull
    private Optional<String> buildCacheKeyFromDatabase() {
        return latestUpdate().map(this::cacheKeyFromTimestamp);
    }

    @Nonnull
    private String cacheKeyFromTimestamp(@Nonnull Long l) {
        return Long.toString(l.longValue(), 36);
    }

    private void compact(@Nonnull Enumerations.ResourceType resourceType) {
        DeltaTable read = read(resourceType);
        String tableUrl = getTableUrl(this.path, resourceType);
        int numPartitions = read.toDF().rdd().getNumPartitions();
        if (numPartitions <= this.compactionThreshold) {
            log.debug("Compaction not needed (number of partitions: {}, threshold: {})", Integer.valueOf(numPartitions), Integer.valueOf(this.compactionThreshold));
        } else {
            log.debug("Scheduling table compaction (number of partitions: {}, threshold: {}): {}", new Object[]{Integer.valueOf(numPartitions), Integer.valueOf(this.compactionThreshold), tableUrl});
            this.executor.submit(() -> {
                log.debug("Commencing compaction: {}", tableUrl);
                read(resourceType).toDF().repartition(new Column[0]).write().option("dataChange", "false").format("delta").mode(SaveMode.Overwrite).save(tableUrl);
                log.debug("Compaction complete: {}", tableUrl);
            });
        }
    }

    @Override // au.csiro.pathling.caching.Cacheable
    @Nonnull
    public Optional<String> getCacheKey() {
        return this.cacheKey;
    }
}
