/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.c2.client.service;

import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
import org.apache.nifi.c2.client.service.operation.C2OperationRestartHandler;
import org.apache.nifi.c2.client.service.operation.OperationQueue;
import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class C2OperationManager
implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(C2OperationManager.class);
    private final C2Client client;
    private final C2OperationHandlerProvider c2OperationHandlerProvider;
    private final ReentrantLock heartbeatLock;
    private final OperationQueueDAO operationQueueDAO;
    private final C2OperationRestartHandler c2OperationRestartHandler;
    private final BlockingQueue<C2Operation> c2Operations;

    public C2OperationManager(C2Client client, C2OperationHandlerProvider c2OperationHandlerProvider, ReentrantLock heartbeatLock, OperationQueueDAO operationQueueDAO, C2OperationRestartHandler c2OperationRestartHandler) {
        this.client = client;
        this.c2OperationHandlerProvider = c2OperationHandlerProvider;
        this.heartbeatLock = heartbeatLock;
        this.operationQueueDAO = operationQueueDAO;
        this.c2OperationRestartHandler = c2OperationRestartHandler;
        this.c2Operations = new LinkedBlockingQueue<C2Operation>();
    }

    public void add(C2Operation c2Operation) {
        try {
            this.c2Operations.put(c2Operation);
        }
        catch (InterruptedException e) {
            LOGGER.warn("Thread was interrupted", (Throwable)e);
        }
    }

    @Override
    public void run() {
        this.processRestartState();
        this.processOperationsInLoop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void processOperationsInLoop() {
        while (true) {
            C2OperationAck operationAck;
            C2Operation operation;
            try {
                operation = this.c2Operations.take();
            }
            catch (InterruptedException e) {
                LOGGER.warn("Thread was interrupted", (Throwable)e);
                return;
            }
            LOGGER.debug("Processing operation {}", (Object)operation);
            C2OperationHandler operationHandler = this.c2OperationHandlerProvider.getHandlerForOperation(operation).orElse(null);
            if (operationHandler == null) {
                LOGGER.debug("No handler is present for C2 Operation {}, available handlers {}", (Object)operation, this.c2OperationHandlerProvider.getHandlers());
                continue;
            }
            try {
                operationAck = operationHandler.handle(operation);
            }
            catch (Exception e) {
                LOGGER.error("Failed to process operation " + String.valueOf(operation), (Throwable)e);
                continue;
            }
            if (!this.requiresRestart(operationHandler, operationAck)) {
                LOGGER.debug("No restart is required. Sending ACK to C2 server {}", (Object)operationAck);
                this.sendAcknowledge(operationAck);
                continue;
            }
            this.heartbeatLock.lock();
            LOGGER.debug("Restart is required. Heartbeats are stopped until restart is completed");
            Optional<C2OperationState> restartState = this.initRestart(operation);
            if (!restartState.isPresent()) {
                LOGGER.debug("Restart in progress, stopping C2OperationManager");
                return;
            }
            try {
                C2OperationState failedState = restartState.get();
                LOGGER.debug("Restart handler returned with a failed state {}", (Object)failedState);
                operationAck.setOperationState(failedState);
                this.sendAcknowledge(operationAck);
                continue;
            }
            finally {
                this.operationQueueDAO.cleanup();
                LOGGER.debug("Heartbeats are enabled again");
                this.heartbeatLock.unlock();
                continue;
            }
            break;
        }
    }

    private void processRestartState() {
        Optional<OperationQueue> operationQueue = this.operationQueueDAO.load();
        operationQueue.map(OperationQueue::getRemainingOperations).filter(Predicate.not(List::isEmpty)).ifPresent(this::processRemainingOperations);
        operationQueue.map(OperationQueue::getCurrentOperation).ifPresentOrElse(this::processCurrentOperation, () -> LOGGER.debug("No operation to acknowledge to C2 server"));
        operationQueue.ifPresent(__ -> this.operationQueueDAO.cleanup());
    }

    private void processRemainingOperations(List<C2Operation> remainingOperations) {
        LOGGER.debug("Found remaining operations operations after restart. Heartbeats are stopped until processing is completed");
        this.heartbeatLock.lock();
        try {
            LinkedList<C2Operation> mergedOperations = new LinkedList<C2Operation>();
            mergedOperations.addAll(remainingOperations);
            mergedOperations.addAll(this.c2Operations);
            this.c2Operations.clear();
            mergedOperations.forEach(this.c2Operations::add);
        }
        catch (Exception e) {
            LOGGER.warn("Unable to recover operations from operation queue", (Throwable)e);
        }
        finally {
            this.heartbeatLock.unlock();
            LOGGER.debug("Heartbeat lock released");
        }
    }

    private void processCurrentOperation(C2Operation operation) {
        LOGGER.debug("Found operation {} to acknowledge to C2 server", (Object)operation);
        C2OperationState c2OperationState = this.c2OperationRestartHandler.waitForResponse().map(this::c2OperationState).orElse(this.c2OperationState(C2OperationState.OperationState.NOT_APPLIED));
        C2OperationAck c2OperationAck = new C2OperationAck();
        c2OperationAck.setOperationId(operation.getIdentifier());
        c2OperationAck.setOperationState(c2OperationState);
        this.sendAcknowledge(c2OperationAck);
    }

    private Optional<C2OperationState> initRestart(C2Operation operation) {
        try {
            LOGGER.debug("Restart initiated");
            OperationQueue operationQueue = OperationQueue.create(operation, this.c2Operations);
            this.operationQueueDAO.save(operationQueue);
            return this.c2OperationRestartHandler.handleRestart(operation).map(this::c2OperationState);
        }
        catch (Exception e) {
            LOGGER.error("Failed to initiate restart. Dropping operation and continue with remaining operations", (Throwable)e);
            return Optional.of(this.c2OperationState(C2OperationState.OperationState.NOT_APPLIED));
        }
    }

    private C2OperationState c2OperationState(C2OperationState.OperationState operationState) {
        C2OperationState c2OperationState = new C2OperationState();
        c2OperationState.setState(operationState);
        return c2OperationState;
    }

    private void sendAcknowledge(C2OperationAck operationAck) {
        try {
            this.client.acknowledgeOperation(operationAck);
        }
        catch (Exception e) {
            LOGGER.error("Failed to send acknowledge", (Throwable)e);
        }
    }

    private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) {
        return c2OperationHandler.requiresRestart() && this.isOperationFullyApplied(c2OperationAck);
    }

    private boolean isOperationFullyApplied(C2OperationAck c2OperationAck) {
        return Optional.ofNullable(c2OperationAck).map(C2OperationAck::getOperationState).map(C2OperationState::getState).filter(arg_0 -> C2OperationState.OperationState.FULLY_APPLIED.equals(arg_0)).isPresent();
    }
}

