package org.apache.nifi.c2.client.service;

import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
import org.apache.nifi.c2.client.service.operation.C2OperationRestartHandler;
import org.apache.nifi.c2.client.service.operation.OperationQueue;
import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/c2/client/service/C2OperationManager.class */
public class C2OperationManager implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(C2OperationManager.class);
    private final C2Client client;
    private final C2OperationHandlerProvider c2OperationHandlerProvider;
    private final ReentrantLock heartbeatLock;
    private final OperationQueueDAO operationQueueDAO;
    private final C2OperationRestartHandler c2OperationRestartHandler;
    private final BlockingQueue<C2Operation> c2Operations = new LinkedBlockingQueue();

    public C2OperationManager(C2Client c2Client, C2OperationHandlerProvider c2OperationHandlerProvider, ReentrantLock reentrantLock, OperationQueueDAO operationQueueDAO, C2OperationRestartHandler c2OperationRestartHandler) {
        this.client = c2Client;
        this.c2OperationHandlerProvider = c2OperationHandlerProvider;
        this.heartbeatLock = reentrantLock;
        this.operationQueueDAO = operationQueueDAO;
        this.c2OperationRestartHandler = c2OperationRestartHandler;
    }

    public void add(C2Operation c2Operation) {
        try {
            this.c2Operations.put(c2Operation);
        } catch (InterruptedException e) {
            LOGGER.warn("Thread was interrupted", e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        processRestartState();
        while (true) {
            try {
                C2Operation take = this.c2Operations.take();
                LOGGER.debug("Processing operation {}", take);
                C2OperationHandler orElse = this.c2OperationHandlerProvider.getHandlerForOperation(take).orElse(null);
                if (orElse == null) {
                    LOGGER.debug("No handler is present for C2 Operation {}, available handlers {}", take, this.c2OperationHandlerProvider.getHandlers());
                } else {
                    C2OperationAck handle = orElse.handle(take);
                    if (requiresRestart(orElse, handle)) {
                        this.heartbeatLock.lock();
                        LOGGER.debug("Restart is required. Heartbeats are stopped until restart is completed");
                        Optional<C2OperationState> initRestart = initRestart(take);
                        if (!initRestart.isPresent()) {
                            LOGGER.debug("Restart in progress, stopping C2OperationManager");
                            return;
                        }
                        try {
                            C2OperationState c2OperationState = initRestart.get();
                            LOGGER.debug("Restart handler returned with a failed state {}", c2OperationState);
                            handle.setOperationState(c2OperationState);
                            sendAcknowledge(handle);
                            this.operationQueueDAO.cleanup();
                            LOGGER.debug("Heartbeats are enabled again");
                            this.heartbeatLock.unlock();
                        } catch (Throwable th) {
                            this.operationQueueDAO.cleanup();
                            LOGGER.debug("Heartbeats are enabled again");
                            this.heartbeatLock.unlock();
                            throw th;
                        }
                    } else {
                        LOGGER.debug("No restart is required. Sending ACK to C2 server {}", handle);
                        sendAcknowledge(handle);
                    }
                }
            } catch (InterruptedException e) {
                LOGGER.warn("Thread was interrupted", e);
                return;
            }
        }
    }

    private void processRestartState() {
        Optional<OperationQueue> load = this.operationQueueDAO.load();
        load.map((v0) -> {
            return v0.getRemainingOperations();
        }).filter(Predicate.not((v0) -> {
            return v0.isEmpty();
        })).ifPresent(this::processRemainingOperations);
        load.map((v0) -> {
            return v0.getCurrentOperation();
        }).ifPresentOrElse(this::processCurrentOperation, () -> {
            LOGGER.debug("No operation to acknowledge to C2 server");
        });
        load.ifPresent(operationQueue -> {
            this.operationQueueDAO.cleanup();
        });
    }

    private void processRemainingOperations(List<C2Operation> list) {
        LOGGER.debug("Found remaining operations operations after restart. Heartbeats are stopped until processing is completed");
        this.heartbeatLock.lock();
        try {
            LinkedList linkedList = new LinkedList();
            linkedList.addAll(list);
            linkedList.addAll(this.c2Operations);
            this.c2Operations.clear();
            BlockingQueue<C2Operation> blockingQueue = this.c2Operations;
            Objects.requireNonNull(blockingQueue);
            linkedList.forEach((v1) -> {
                r1.add(v1);
            });
        } catch (Exception e) {
            LOGGER.warn("Unable to recover operations from operation queue", e);
        } finally {
            this.heartbeatLock.unlock();
            LOGGER.debug("Heartbeat lock released");
        }
    }

    private void processCurrentOperation(C2Operation c2Operation) {
        LOGGER.debug("Found operation {} to acknowledge to C2 server", c2Operation);
        C2OperationState c2OperationState = (C2OperationState) this.c2OperationRestartHandler.waitForResponse().map(this::c2OperationState).orElse(c2OperationState(C2OperationState.OperationState.NOT_APPLIED));
        C2OperationAck c2OperationAck = new C2OperationAck();
        c2OperationAck.setOperationId(c2Operation.getIdentifier());
        c2OperationAck.setOperationState(c2OperationState);
        sendAcknowledge(c2OperationAck);
    }

    private Optional<C2OperationState> initRestart(C2Operation c2Operation) {
        try {
            LOGGER.debug("Restart initiated");
            this.operationQueueDAO.save(OperationQueue.create(c2Operation, this.c2Operations));
            return this.c2OperationRestartHandler.handleRestart(c2Operation).map(this::c2OperationState);
        } catch (Exception e) {
            LOGGER.error("Failed to initiate restart. Dropping operation and continue with remaining operations", e);
            return Optional.of(c2OperationState(C2OperationState.OperationState.NOT_APPLIED));
        }
    }

    private C2OperationState c2OperationState(C2OperationState.OperationState operationState) {
        C2OperationState c2OperationState = new C2OperationState();
        c2OperationState.setState(operationState);
        return c2OperationState;
    }

    private void sendAcknowledge(C2OperationAck c2OperationAck) {
        try {
            this.client.acknowledgeOperation(c2OperationAck);
        } catch (Exception e) {
            LOGGER.error("Failed to send acknowledge", e);
        }
    }

    private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) {
        return c2OperationHandler.requiresRestart() && isOperationFullyApplied(c2OperationAck);
    }

    private boolean isOperationFullyApplied(C2OperationAck c2OperationAck) {
        Optional map = Optional.ofNullable(c2OperationAck).map((v0) -> {
            return v0.getOperationState();
        }).map((v0) -> {
            return v0.getState();
        });
        C2OperationState.OperationState operationState = C2OperationState.OperationState.FULLY_APPLIED;
        Objects.requireNonNull(operationState);
        return map.filter((v1) -> {
            return r1.equals(v1);
        }).isPresent();
    }
}
