/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.status.history.questdb;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.NodeStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
import org.apache.nifi.controller.status.history.StatusSnapshot;
import org.apache.nifi.controller.status.history.questdb.CapturedStatus;
import org.apache.nifi.controller.status.history.questdb.StatusHistoryStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class BufferedStatusHistoryStorage
implements StatusHistoryStorage {
    private static final Logger LOGGER = LoggerFactory.getLogger(BufferedStatusHistoryStorage.class);
    private final String id = UUID.randomUUID().toString();
    private final List<ScheduledFuture<?>> scheduledFutures = new ArrayList();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1, (ThreadFactory)BasicThreadFactory.builder().namingPattern("BufferedStatusHistoryStorage-" + this.id + "-%d").build());
    private final StatusHistoryStorage storage;
    private final long persistFrequencyInMs;
    private final int persistBatchSize;
    private final BlockingQueue<CapturedStatus<NodeStatus>> nodeStatusQueue = new LinkedBlockingQueue<CapturedStatus<NodeStatus>>();
    private final BlockingQueue<CapturedStatus<GarbageCollectionStatus>> garbageCollectionStatusQueue = new LinkedBlockingQueue<CapturedStatus<GarbageCollectionStatus>>();
    private final BlockingQueue<CapturedStatus<ProcessGroupStatus>> processGroupStatusQueue = new LinkedBlockingQueue<CapturedStatus<ProcessGroupStatus>>();
    private final BlockingQueue<CapturedStatus<ConnectionStatus>> connectionStatusQueue = new LinkedBlockingQueue<CapturedStatus<ConnectionStatus>>();
    private final BlockingQueue<CapturedStatus<RemoteProcessGroupStatus>> remoteProcessGroupStatusQueue = new LinkedBlockingQueue<CapturedStatus<RemoteProcessGroupStatus>>();
    private final BlockingQueue<CapturedStatus<ProcessorStatus>> processorStatusQueue = new LinkedBlockingQueue<CapturedStatus<ProcessorStatus>>();

    public BufferedStatusHistoryStorage(StatusHistoryStorage storage, long persistFrequencyInMs, int persistBatchSize) {
        this.storage = storage;
        this.persistFrequencyInMs = persistFrequencyInMs;
        this.persistBatchSize = persistBatchSize;
    }

    @Override
    public void init() {
        this.storage.init();
        ScheduledFuture<?> future = this.scheduledExecutorService.scheduleWithFixedDelay(new BufferedStatusHistoryStorageWorker(), this.persistFrequencyInMs, this.persistFrequencyInMs, TimeUnit.MILLISECONDS);
        this.scheduledFutures.add(future);
        LOGGER.info("Flushing is initiated");
    }

    @Override
    public void close() {
        this.storage.close();
        LOGGER.debug("Flushing shutdown started");
        int cancelCompleted = 0;
        int cancelFailed = 0;
        for (ScheduledFuture<?> scheduledFuture : this.scheduledFutures) {
            boolean cancelled = scheduledFuture.cancel(true);
            if (cancelled) {
                ++cancelCompleted;
                continue;
            }
            ++cancelFailed;
        }
        LOGGER.debug("Flushing shutdown task cancellation status: completed [{}] failed [{}]", (Object)cancelCompleted, (Object)cancelFailed);
        List<Runnable> tasks = this.scheduledExecutorService.shutdownNow();
        LOGGER.debug("Scheduled Task Service shutdown remaining tasks [{}]", (Object)tasks.size());
    }

    @Override
    public List<StatusSnapshot> getConnectionSnapshots(String componentId, Date start, Date end) {
        return this.storage.getConnectionSnapshots(componentId, start, end);
    }

    @Override
    public List<StatusSnapshot> getProcessGroupSnapshots(String componentId, Date start, Date end) {
        return this.storage.getProcessGroupSnapshots(componentId, start, end);
    }

    @Override
    public List<StatusSnapshot> getRemoteProcessGroupSnapshots(String componentId, Date start, Date end) {
        return this.storage.getRemoteProcessGroupSnapshots(componentId, start, end);
    }

    @Override
    public List<StatusSnapshot> getProcessorSnapshots(String componentId, Date start, Date end) {
        return this.storage.getProcessorSnapshots(componentId, start, end);
    }

    @Override
    public List<StatusSnapshot> getProcessorSnapshotsWithCounters(String componentId, Date start, Date end) {
        return this.storage.getProcessorSnapshotsWithCounters(componentId, start, end);
    }

    @Override
    public List<GarbageCollectionStatus> getGarbageCollectionSnapshots(Date start, Date end) {
        return this.storage.getGarbageCollectionSnapshots(start, end);
    }

    @Override
    public List<StatusSnapshot> getNodeStatusSnapshots(Date start, Date end) {
        return this.storage.getNodeStatusSnapshots(start, end);
    }

    @Override
    public void storeNodeStatuses(Collection<CapturedStatus<NodeStatus>> statuses) {
        this.nodeStatusQueue.addAll(statuses);
    }

    @Override
    public void storeGarbageCollectionStatuses(Collection<CapturedStatus<GarbageCollectionStatus>> statuses) {
        this.garbageCollectionStatusQueue.addAll(statuses);
    }

    @Override
    public void storeProcessGroupStatuses(Collection<CapturedStatus<ProcessGroupStatus>> statuses) {
        this.processGroupStatusQueue.addAll(statuses);
    }

    @Override
    public void storeConnectionStatuses(Collection<CapturedStatus<ConnectionStatus>> statuses) {
        this.connectionStatusQueue.addAll(statuses);
    }

    @Override
    public void storeRemoteProcessorGroupStatuses(Collection<CapturedStatus<RemoteProcessGroupStatus>> statuses) {
        this.remoteProcessGroupStatusQueue.addAll(statuses);
    }

    @Override
    public void storeProcessorStatuses(Collection<CapturedStatus<ProcessorStatus>> statuses) {
        this.processorStatusQueue.addAll(statuses);
    }

    private class BufferedStatusHistoryStorageWorker
    implements Runnable {
        private BufferedStatusHistoryStorageWorker() {
        }

        @Override
        public void run() {
            LOGGER.debug("Start flushing");
            this.flush(BufferedStatusHistoryStorage.this.nodeStatusQueue, BufferedStatusHistoryStorage.this.storage::storeNodeStatuses);
            this.flush(BufferedStatusHistoryStorage.this.garbageCollectionStatusQueue, BufferedStatusHistoryStorage.this.storage::storeGarbageCollectionStatuses);
            this.flush(BufferedStatusHistoryStorage.this.processGroupStatusQueue, BufferedStatusHistoryStorage.this.storage::storeProcessGroupStatuses);
            this.flush(BufferedStatusHistoryStorage.this.connectionStatusQueue, BufferedStatusHistoryStorage.this.storage::storeConnectionStatuses);
            this.flush(BufferedStatusHistoryStorage.this.remoteProcessGroupStatusQueue, BufferedStatusHistoryStorage.this.storage::storeRemoteProcessorGroupStatuses);
            this.flush(BufferedStatusHistoryStorage.this.processorStatusQueue, BufferedStatusHistoryStorage.this.storage::storeProcessorStatuses);
            LOGGER.debug("Finish flushing");
        }

        private <T> void flush(BlockingQueue<T> source, Consumer<Collection<T>> target) {
            ArrayList statusEntries = new ArrayList(BufferedStatusHistoryStorage.this.persistBatchSize);
            source.drainTo(statusEntries, BufferedStatusHistoryStorage.this.persistBatchSize);
            if (!statusEntries.isEmpty()) {
                try {
                    target.accept(statusEntries);
                }
                catch (Exception e) {
                    LOGGER.error("Error during flushing buffered status history information.", (Throwable)e);
                }
            }
        }
    }
}

