package org.apache.nifi.provenance.index.lucene;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.lucene.document.Document;
import org.apache.lucene.search.NumericRangeQuery;
import org.apache.lucene.search.Query;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.index.EventIndexWriter;
import org.apache.nifi.provenance.lucene.IndexManager;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/provenance/index/lucene/EventIndexTask.class */
public class EventIndexTask implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(EventIndexTask.class);
    private static final String EVENT_CATEGORY = "Provenance Repository";
    public static final int MAX_DOCUMENTS_PER_THREAD = 100;
    public static final int DEFAULT_MAX_EVENTS_PER_COMMIT = 1000000;
    private final BlockingQueue<StoredDocument> documentQueue;
    private final IndexManager indexManager;
    private volatile boolean shutdown = false;
    private final IndexDirectoryManager directoryManager;
    private final EventReporter eventReporter;
    private final int commitThreshold;

    public EventIndexTask(BlockingQueue<StoredDocument> blockingQueue, RepositoryConfiguration repositoryConfiguration, IndexManager indexManager, IndexDirectoryManager indexDirectoryManager, int i, EventReporter eventReporter) {
        this.documentQueue = blockingQueue;
        this.indexManager = indexManager;
        this.directoryManager = indexDirectoryManager;
        this.commitThreshold = i;
        this.eventReporter = eventReporter;
    }

    public void shutdown() {
        this.shutdown = true;
    }

    private void fetchDocuments(List<StoredDocument> list) throws InterruptedException {
        StoredDocument poll = this.documentQueue.poll(1L, TimeUnit.SECONDS);
        if (poll == null) {
            return;
        }
        list.add(poll);
        this.documentQueue.drainTo(list, 99);
    }

    @Override // java.lang.Runnable
    public void run() {
        ArrayList arrayList = new ArrayList(100);
        while (!this.shutdown) {
            try {
                arrayList.clear();
                fetchDocuments(arrayList);
                if (!arrayList.isEmpty()) {
                    for (Map.Entry entry : ((Map) arrayList.stream().collect(Collectors.groupingBy(storedDocument -> {
                        return storedDocument.getStorageSummary().getPartitionName().get();
                    }))).entrySet()) {
                        index((List) entry.getValue(), (String) entry.getKey());
                    }
                }
            } catch (Exception e) {
                logger.error("Failed to index Provenance Events", e);
                this.eventReporter.reportEvent(Severity.ERROR, "Provenance Repository", "Failed to index Provenance Events. See logs for more information.");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reIndex(List<IndexableDocument> list, CommitPreference commitPreference) throws IOException {
        if (list.isEmpty()) {
            return;
        }
        for (Map.Entry entry : ((Map) list.stream().collect(Collectors.groupingBy(indexableDocument -> {
            return indexableDocument.getIndexDirectory();
        }))).entrySet()) {
            File file = (File) entry.getKey();
            List list2 = (List) entry.getValue();
            EventIndexWriter borrowIndexWriter = this.indexManager.borrowIndexWriter(file);
            try {
                long j = Long.MAX_VALUE;
                long j2 = Long.MIN_VALUE;
                Iterator<IndexableDocument> it = list.iterator();
                while (it.hasNext()) {
                    long longValue = it.next().getDocument().getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
                    if (longValue < j) {
                        j = longValue;
                    }
                    if (longValue > j2) {
                        j2 = longValue;
                    }
                }
                borrowIndexWriter.getIndexWriter().deleteDocuments(new Query[]{NumericRangeQuery.newLongRange(SearchableFields.Identifier.getSearchableFieldName(), Long.valueOf(j), Long.valueOf(j2), true, true)});
                borrowIndexWriter.index((List<Document>) list2.stream().map(indexableDocument2 -> {
                    return indexableDocument2.getDocument();
                }).collect(Collectors.toList()), this.commitThreshold);
                this.indexManager.returnIndexWriter(borrowIndexWriter, CommitPreference.FORCE_COMMIT.equals(commitPreference), false);
            } catch (Throwable th) {
                this.indexManager.returnIndexWriter(borrowIndexWriter, CommitPreference.FORCE_COMMIT.equals(commitPreference), false);
                throw th;
            }
        }
    }

    private void index(List<StoredDocument> list, String str) throws IOException {
        File writableIndexingDirectory;
        EventIndexWriter borrowIndexWriter;
        if (list.isEmpty()) {
            return;
        }
        List<Document> list2 = (List) list.stream().map(storedDocument -> {
            return storedDocument.getDocument();
        }).collect(Collectors.toList());
        boolean z = false;
        boolean z2 = false;
        long asLong = list.stream().mapToLong(storedDocument2 -> {
            return storedDocument2.getDocument().getField(SearchableFields.EventTime.getSearchableFieldName()).numericValue().longValue();
        }).min().getAsLong();
        synchronized (this.directoryManager) {
            writableIndexingDirectory = this.directoryManager.getWritableIndexingDirectory(asLong, str);
            borrowIndexWriter = this.indexManager.borrowIndexWriter(writableIndexingDirectory);
        }
        try {
            boolean index = borrowIndexWriter.index(list2, this.commitThreshold);
            Optional<File> activeIndexDirectory = this.directoryManager.getActiveIndexDirectory(str);
            if (!activeIndexDirectory.isPresent() || !activeIndexDirectory.get().equals(writableIndexingDirectory)) {
                z2 = true;
                z = true;
            }
            if (index) {
                commit(borrowIndexWriter);
                z2 = false;
                z = z || this.directoryManager.onIndexCommitted(writableIndexingDirectory);
                if (logger.isDebugEnabled()) {
                    logger.debug("Committed index {} after writing a max Event ID of {}", writableIndexingDirectory, Long.valueOf(list2.stream().mapToLong(document -> {
                        return document.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
                    }).max().orElse(-1L)));
                }
            }
        } finally {
            this.indexManager.returnIndexWriter(borrowIndexWriter, z2, z);
        }
    }

    protected void commit(EventIndexWriter eventIndexWriter) throws IOException {
        long nanoTime = System.nanoTime();
        logger.debug("Successfully committed approximately {} Events to {} in {} millis", new Object[]{Long.valueOf(eventIndexWriter.commit()), eventIndexWriter, Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime))});
    }
}
