package org.apache.nifi.questdb.embedded;

import io.questdb.cairo.CairoEngine;
import io.questdb.cairo.DefaultCairoConfiguration;
import io.questdb.cairo.TableToken;
import java.io.File;
import java.io.IOException;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.nifi.questdb.Client;
import org.apache.nifi.questdb.DatabaseException;
import org.apache.nifi.questdb.DatabaseManager;
import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/questdb/embedded/EmbeddedDatabaseManager.class */
final class EmbeddedDatabaseManager implements DatabaseManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedDatabaseManager.class);
    private final EmbeddedDatabaseManagerContext context;
    private final String id = UUID.randomUUID().toString();
    private final AtomicReference<EmbeddedDatabaseManagerStatus> state = new AtomicReference<>(EmbeddedDatabaseManagerStatus.UNINITIALIZED);
    private final ReadWriteLock databaseStructureLock = new ReentrantReadWriteLock();
    private final AtomicReference<CairoEngine> engine = new AtomicReference<>();
    private final List<ScheduledFuture<?>> scheduledFutures = new ArrayList();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbManagerWorker-" + this.id + "-%d").build());

    /* JADX INFO: Access modifiers changed from: package-private */
    public EmbeddedDatabaseManager(EmbeddedDatabaseManagerContext embeddedDatabaseManagerContext) {
        this.context = embeddedDatabaseManagerContext;
    }

    @Override // org.apache.nifi.questdb.DatabaseManager
    public void init() {
        if (this.state.get() != EmbeddedDatabaseManagerStatus.UNINITIALIZED) {
            throw new IllegalStateException("Manager is already initialized");
        }
        ensureDatabaseIsReady();
        startRollover();
    }

    private void ensureDatabaseIsReady() {
        boolean z = false;
        try {
            this.databaseStructureLock.writeLock().lock();
            this.state.set(EmbeddedDatabaseManagerStatus.REPAIRING);
            try {
                ensurePersistLocationIsAccessible();
                ensureConnectionEstablished();
                ensureTablesAreInPlaceAndHealthy();
                z = true;
            } catch (CorruptedDatabaseException e) {
                boolean z2 = false;
                try {
                    LOGGER.error("Database is corrupted. Recreation is triggered. Manager tries to move corrupted database files to the backup location: {}", this.context.getBackupLocation(), e);
                    File file = new File(this.context.getBackupLocationAsPath().toFile(), "backup_" + System.currentTimeMillis());
                    FileUtils.ensureDirectoryExistAndCanAccess(this.context.getBackupLocationAsPath().toFile());
                    Files.move(this.context.getPersistLocationAsPath(), file.toPath(), new CopyOption[0]);
                    z2 = true;
                } catch (IOException e2) {
                    LOGGER.error("Could not create backup", e2);
                }
                if (!z2) {
                    try {
                        FileUtils.deleteFile(this.context.getPersistLocationAsPath().toFile(), true);
                        z2 = true;
                    } catch (IOException e3) {
                        LOGGER.error("Could not clean up corrupted database", e3);
                    }
                }
                if (z2) {
                    try {
                        ensurePersistLocationIsAccessible();
                        ensureConnectionEstablished();
                        ensureTablesAreInPlaceAndHealthy();
                        z = true;
                    } catch (CorruptedDatabaseException e4) {
                        LOGGER.error("Could not create backup", e4);
                    }
                }
            }
        } finally {
            this.state.set(z ? EmbeddedDatabaseManagerStatus.HEALTHY : EmbeddedDatabaseManagerStatus.CORRUPTED);
            if (!z) {
                this.engine.set(null);
            }
            this.databaseStructureLock.writeLock().unlock();
        }
    }

    private void ensurePersistLocationIsAccessible() throws CorruptedDatabaseException {
        try {
            FileUtils.ensureDirectoryExistAndCanAccess(this.context.getPersistLocationAsPath().toFile());
        } catch (Exception e) {
            throw new CorruptedDatabaseException(String.format("Database directory creation failed [%s]", this.context.getPersistLocationAsPath()), e);
        }
    }

    private void ensureConnectionEstablished() throws CorruptedDatabaseException {
        if (this.engine.get() != null) {
            this.engine.getAndSet(null).close();
        }
        String absolutePath = this.context.getPersistLocationAsPath().toFile().getAbsolutePath();
        try {
            CairoEngine cairoEngine = new CairoEngine(new DefaultCairoConfiguration(absolutePath));
            LOGGER.info("Database connection successful [{}]", absolutePath);
            this.engine.set(cairoEngine);
        } catch (Exception e) {
            throw new CorruptedDatabaseException(String.format("Database connection failed [%s]", absolutePath), e);
        }
    }

    private void ensureTablesAreInPlaceAndHealthy() throws CorruptedDatabaseException {
        Map map = (Map) Arrays.stream(this.context.getPersistLocationAsPath().toFile().listFiles()).collect(Collectors.toMap(file -> {
            return file.getAbsolutePath().substring(this.context.getPersistLocationAsPath().toFile().getAbsolutePath().length() + 1);
        }, file2 -> {
            return file2;
        }));
        Client unmanagedClient = getUnmanagedClient();
        try {
            for (ManagedTableDefinition managedTableDefinition : this.context.getTableDefinitions()) {
                if (!map.containsKey(managedTableDefinition.getName())) {
                    try {
                        LOGGER.debug("Creating table {}", managedTableDefinition.getName());
                        unmanagedClient.execute(managedTableDefinition.getDefinition());
                        LOGGER.debug("Table {} is created", managedTableDefinition.getName());
                    } catch (DatabaseException e) {
                        throw new CorruptedDatabaseException(String.format("Creating table [%s] has failed", managedTableDefinition.getName()), e);
                    }
                } else if (!((File) map.get(managedTableDefinition.getName())).isDirectory()) {
                    throw new CorruptedDatabaseException(String.format("Table %s cannot be created because there is already a file exists with the given name", managedTableDefinition.getName()));
                }
            }
            for (ManagedTableDefinition managedTableDefinition2 : this.context.getTableDefinitions()) {
                try {
                    TableToken tableTokenIfExists = this.engine.get().getTableTokenIfExists(managedTableDefinition2.getName());
                    if (tableTokenIfExists.isWal()) {
                        this.engine.get().getSequencerMetadata(tableTokenIfExists).close();
                    }
                    unmanagedClient.execute(String.format("SELECT * FROM %S LIMIT 1", managedTableDefinition2.getName()));
                } catch (Exception e2) {
                    throw new CorruptedDatabaseException(e2);
                }
            }
            try {
                unmanagedClient.disconnect();
            } catch (DatabaseException e3) {
                throw new CorruptedDatabaseException(e3);
            }
        } catch (Throwable th) {
            try {
                unmanagedClient.disconnect();
                throw th;
            } catch (DatabaseException e4) {
                throw new CorruptedDatabaseException(e4);
            }
        }
    }

    private void startRollover() {
        this.scheduledFutures.add(this.scheduledExecutorService.scheduleWithFixedDelay(new RolloverWorker(acquireClient(), this.context.getTableDefinitions()), this.context.getRolloverFrequency().toMillis(), this.context.getRolloverFrequency().toMillis(), TimeUnit.MILLISECONDS));
        LOGGER.debug("Rollover started");
    }

    private void stopRollover() {
        LOGGER.debug("Rollover shutdown started");
        int i = 0;
        int i2 = 0;
        Iterator<ScheduledFuture<?>> it = this.scheduledFutures.iterator();
        while (it.hasNext()) {
            if (it.next().cancel(true)) {
                i++;
            } else {
                i2++;
            }
        }
        LOGGER.debug("Rollover shutdown task cancellation status: completed [{}] failed [{}]", Integer.valueOf(i), Integer.valueOf(i2));
        LOGGER.debug("Rollover Scheduled Task Service shutdown remaining tasks [{}]", Integer.valueOf(this.scheduledExecutorService.shutdownNow().size()));
    }

    private Client getUnmanagedClient() {
        return new EmbeddedClient(() -> {
            return this.engine.get();
        });
    }

    @Override // org.apache.nifi.questdb.DatabaseManager
    public Client acquireClient() {
        checkIfManagerIsInitialised();
        NoOpClient noOpClient = new NoOpClient();
        if (this.state.get() == EmbeddedDatabaseManagerStatus.CORRUPTED) {
            LOGGER.error("The database is corrupted: Status History will not be stored");
            return noOpClient;
        }
        return RetryingClient.getInstance(this.context.getNumberOfAttemptedRetries(), (v1, v2) -> {
            errorAction(v1, v2);
        }, new LockedClient(this.databaseStructureLock.readLock(), this.context.getLockAttemptTime(), new ConditionAwareClient(() -> {
            return this.state.get() == EmbeddedDatabaseManagerStatus.HEALTHY;
        }, getUnmanagedClient())), noOpClient);
    }

    private void checkIfManagerIsInitialised() {
        if (this.state.get() == EmbeddedDatabaseManagerStatus.UNINITIALIZED) {
            throw new IllegalStateException("The state of the database manager is not initialized");
        }
    }

    private void errorAction(int i, Throwable th) {
        if (!shouldRestoreDatabase(i, th)) {
            LOGGER.warn("Error happened at attempt: {}", Integer.valueOf(i), th);
        } else {
            LOGGER.error("Database manager tries to restore database after the first failed attempt if necessary");
            ensureDatabaseIsReady();
        }
    }

    private boolean shouldRestoreDatabase(int i, Throwable th) {
        return (this.state.get() == EmbeddedDatabaseManagerStatus.CORRUPTED || this.state.get() == EmbeddedDatabaseManagerStatus.CLOSED || (th instanceof ConditionFailedException) || (th instanceof LockUnsuccessfulException) || (th instanceof ClientDisconnectedException) || i != 1) ? false : true;
    }

    @Override // org.apache.nifi.questdb.DatabaseManager, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.databaseStructureLock.writeLock().lock();
        checkIfManagerIsInitialised();
        stopRollover();
        this.state.set(EmbeddedDatabaseManagerStatus.CLOSED);
        CairoEngine cairoEngine = this.engine.get();
        if (cairoEngine != null) {
            cairoEngine.close();
        }
        this.databaseStructureLock.writeLock().unlock();
    }
}
