package org.apache.nifi.provenance.store;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.apache.nifi.provenance.authorization.EventAuthorizer;
import org.apache.nifi.provenance.authorization.EventTransformer;
import org.apache.nifi.provenance.store.iterator.AuthorizingEventIterator;
import org.apache.nifi.provenance.store.iterator.EventIterator;
import org.apache.nifi.provenance.util.DirectoryUtils;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/provenance/store/PartitionedEventStore.class */
public abstract class PartitionedEventStore implements EventStore {
    private static final Logger logger = LoggerFactory.getLogger(PartitionedEventStore.class);
    private static final String EVENT_CATEGORY = "Provenance Repository";
    private final AtomicLong partitionIndex = new AtomicLong(0);
    private final RepositoryConfiguration repoConfig;
    private final EventReporter eventReporter;
    private ScheduledExecutorService maintenanceExecutor;

    public PartitionedEventStore(RepositoryConfiguration repositoryConfiguration, EventReporter eventReporter) {
        this.repoConfig = repositoryConfiguration;
        this.eventReporter = eventReporter;
    }

    @Override // org.apache.nifi.provenance.store.EventStore
    public void initialize() throws IOException {
        this.maintenanceExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Provenance Repository Maintenance"));
        long maintenanceFrequency = this.repoConfig.getMaintenanceFrequency(TimeUnit.MILLISECONDS);
        this.maintenanceExecutor.scheduleWithFixedDelay(this::performMaintenance, maintenanceFrequency, maintenanceFrequency, TimeUnit.MILLISECONDS);
        Iterator<? extends EventStorePartition> it = getPartitions().iterator();
        while (it.hasNext()) {
            it.next().initialize();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.maintenanceExecutor != null) {
            this.maintenanceExecutor.shutdownNow();
        }
        IOException iOException = null;
        Iterator<? extends EventStorePartition> it = getPartitions().iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (IOException e) {
                if (iOException == null) {
                    iOException = e;
                } else {
                    iOException.addSuppressed(e);
                }
            }
        }
        if (iOException != null) {
            throw iOException;
        }
    }

    @Override // org.apache.nifi.provenance.store.EventStore
    public StorageResult addEvents(Iterable<ProvenanceEventRecord> iterable) throws IOException {
        return getPartitions().get((int) (this.partitionIndex.getAndIncrement() % r0.size())).addEvents(iterable);
    }

    @Override // org.apache.nifi.provenance.store.EventStore
    public long getSize() {
        long j = 0;
        Iterator<? extends EventStorePartition> it = getPartitions().iterator();
        while (it.hasNext()) {
            j += it.next().getSize();
        }
        return j;
    }

    private long getRepoSize() {
        long j = 0;
        Iterator<File> it = this.repoConfig.getStorageDirectories().values().iterator();
        while (it.hasNext()) {
            j += DirectoryUtils.getSize(it.next());
        }
        return j;
    }

    @Override // org.apache.nifi.provenance.store.EventStore
    public long getMaxEventId() {
        return getPartitions().stream().mapToLong((v0) -> {
            return v0.getMaxEventId();
        }).max().orElse(-1L);
    }

    @Override // org.apache.nifi.provenance.store.EventStore
    public Optional<ProvenanceEventRecord> getEvent(long j) throws IOException {
        Iterator<? extends EventStorePartition> it = getPartitions().iterator();
        while (it.hasNext()) {
            Optional<ProvenanceEventRecord> event = it.next().getEvent(j);
            if (event.isPresent()) {
                return event;
            }
        }
        return Optional.empty();
    }

    @Override // org.apache.nifi.provenance.store.EventStore
    public List<ProvenanceEventRecord> getEvents(long j, int i) throws IOException {
        return getEvents(j, i, EventAuthorizer.GRANT_ALL, EventTransformer.EMPTY_TRANSFORMER);
    }

    @Override // org.apache.nifi.provenance.store.EventStore
    public List<ProvenanceEventRecord> getEvents(long j, int i, EventAuthorizer eventAuthorizer, EventTransformer eventTransformer) throws IOException {
        return (j + ((long) i) < 1 || i < 1 || j > getMaxEventId()) ? Collections.emptyList() : getEvents(i, eventAuthorizer, eventStorePartition -> {
            return eventStorePartition.createEventIterator(j);
        }, eventTransformer);
    }

    @Override // org.apache.nifi.provenance.store.EventStore
    public List<ProvenanceEventRecord> getEvents(List<Long> list, EventAuthorizer eventAuthorizer, EventTransformer eventTransformer) throws IOException {
        return (list == null || list.isEmpty()) ? Collections.emptyList() : getEvents(list.size(), eventAuthorizer, eventStorePartition -> {
            return eventStorePartition.createEventIterator((List<Long>) list);
        }, eventTransformer);
    }

    private List<ProvenanceEventRecord> getEvents(int i, EventAuthorizer eventAuthorizer, Function<EventStorePartition, EventIterator> function, EventTransformer eventTransformer) throws IOException {
        if (i < 1) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        TreeMap treeMap = new TreeMap((provenanceEventRecord, provenanceEventRecord2) -> {
            return Long.compare(provenanceEventRecord.getEventId(), provenanceEventRecord2.getEventId());
        });
        ArrayList<EventIterator> arrayList2 = new ArrayList();
        try {
            Iterator<? extends EventStorePartition> it = getPartitions().iterator();
            while (it.hasNext()) {
                AuthorizingEventIterator authorizingEventIterator = new AuthorizingEventIterator(function.apply(it.next()), eventAuthorizer == null ? EventAuthorizer.GRANT_ALL : eventAuthorizer, eventTransformer);
                arrayList2.add(authorizingEventIterator);
                Optional<ProvenanceEventRecord> nextEvent = authorizingEventIterator.nextEvent();
                if (nextEvent.isPresent()) {
                    treeMap.put(nextEvent.get(), authorizingEventIterator);
                }
            }
            if (treeMap.isEmpty()) {
                return arrayList;
            }
            for (ProvenanceEventRecord provenanceEventRecord3 = (ProvenanceEventRecord) treeMap.firstKey(); provenanceEventRecord3 != null; provenanceEventRecord3 = treeMap.isEmpty() ? null : (ProvenanceEventRecord) treeMap.firstKey()) {
                if (arrayList.size() >= i) {
                    break;
                }
                arrayList.add(provenanceEventRecord3);
                EventIterator eventIterator = (EventIterator) treeMap.remove(provenanceEventRecord3);
                Optional<ProvenanceEventRecord> nextEvent2 = eventIterator.nextEvent();
                if (nextEvent2.isPresent()) {
                    treeMap.put(nextEvent2.get(), eventIterator);
                }
            }
            for (EventIterator eventIterator2 : arrayList2) {
                try {
                    eventIterator2.close();
                } catch (Exception e) {
                    if (logger.isDebugEnabled()) {
                        logger.warn("Failed to close Record Reader {}", eventIterator2, e);
                    } else {
                        logger.warn("Failed to close Record Reader {}", eventIterator2);
                    }
                }
            }
            return arrayList;
        } finally {
            for (EventIterator eventIterator3 : arrayList2) {
                try {
                    eventIterator3.close();
                } catch (Exception e2) {
                    if (logger.isDebugEnabled()) {
                        logger.warn("Failed to close Record Reader {}", eventIterator3, e2);
                    } else {
                        logger.warn("Failed to close Record Reader {}", eventIterator3);
                    }
                }
            }
        }
    }

    void performMaintenance() {
        try {
            long maxRecordLife = this.repoConfig.getMaxRecordLife(TimeUnit.MILLISECONDS);
            for (EventStorePartition eventStorePartition : getPartitions()) {
                try {
                    eventStorePartition.purgeOldEvents(maxRecordLife, TimeUnit.MILLISECONDS);
                } catch (Exception e) {
                    logger.error("Failed to purge expired events from " + String.valueOf(eventStorePartition), e);
                    this.eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "Failed to purge expired events from Provenance Repository. See logs for more information.");
                }
            }
            long maxStorageCapacity = this.repoConfig.getMaxStorageCapacity();
            try {
                long repoSize = getRepoSize();
                while (repoSize > maxStorageCapacity) {
                    for (EventStorePartition eventStorePartition2 : getPartitions()) {
                        try {
                            repoSize -= eventStorePartition2.purgeOldestEvents();
                        } catch (Exception e2) {
                            logger.error("Failed to purge oldest events from " + String.valueOf(eventStorePartition2), e2);
                            this.eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "Failed to purge oldest events from Provenance Repository. See logs for more information.");
                        }
                    }
                }
            } catch (Exception e3) {
                logger.error("Could not determine size of Provenance Repository. Will not expire any data due to storage limits", e3);
                this.eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "Failed to determine size of Provenance Repository. No data will be expired due to storage limits at this time. See logs for more information.");
            }
        } catch (Exception e4) {
            logger.error("Failed to perform periodic maintenance", e4);
            this.eventReporter.reportEvent(Severity.ERROR, "Provenance Repository", "Failed to perform periodic maintenace for Provenance Repository. See logs for more information.");
        }
    }

    protected abstract List<? extends EventStorePartition> getPartitions();
}
