/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.stateless.engine;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import org.apache.nifi.components.state.StatelessStateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.exception.TerminatedTaskException;
import org.apache.nifi.stateless.engine.DataflowAbortedException;
import org.apache.nifi.stateless.engine.ExecutionProgress;
import org.apache.nifi.stateless.engine.FlowPurgeAction;
import org.apache.nifi.stateless.flow.CanceledTriggerResult;
import org.apache.nifi.stateless.flow.DataflowTriggerContext;
import org.apache.nifi.stateless.flow.ExceptionalTriggerResult;
import org.apache.nifi.stateless.flow.FailurePortEncounteredException;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.apache.nifi.stateless.queue.DrainableFlowFileQueue;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import org.apache.nifi.stateless.session.AsynchronousCommitTracker;
import org.apache.nifi.stateless.session.StatelessProcessSession;
import org.apache.nifi.stream.io.StreamUtils;

public class StandardExecutionProgress
implements ExecutionProgress {
    private final ProcessGroup rootGroup;
    private final List<FlowFileQueue> internalFlowFileQueues;
    private final ContentRepository contentRepository;
    private final BlockingQueue<TriggerResult> resultQueue;
    private final Set<String> failurePortNames;
    private final AsynchronousCommitTracker commitTracker;
    private final StatelessStateManagerProvider stateManagerProvider;
    private final DataflowTriggerContext triggerContext;
    private final FlowPurgeAction purgeAction;
    private final List<StatelessProcessSession> createdSessions = new ArrayList<StatelessProcessSession>();
    private final BlockingQueue<ExecutionProgress.CompletionAction> completionActionQueue;
    private volatile boolean canceled = false;
    private volatile boolean failed = false;
    private volatile ExecutionProgress.CompletionAction completionAction = null;

    public StandardExecutionProgress(ProcessGroup rootGroup, List<FlowFileQueue> internalFlowFileQueues, BlockingQueue<TriggerResult> resultQueue, RepositoryContextFactory repositoryContextFactory, Set<String> failurePortNames, AsynchronousCommitTracker commitTracker, StatelessStateManagerProvider stateManagerProvider, DataflowTriggerContext triggerContext, FlowPurgeAction purgeAction) {
        this.rootGroup = rootGroup;
        this.internalFlowFileQueues = internalFlowFileQueues;
        this.resultQueue = resultQueue;
        this.contentRepository = repositoryContextFactory.getContentRepository();
        this.failurePortNames = failurePortNames;
        this.commitTracker = commitTracker;
        this.stateManagerProvider = stateManagerProvider;
        this.triggerContext = triggerContext;
        this.purgeAction = purgeAction;
        this.completionActionQueue = new LinkedBlockingQueue<ExecutionProgress.CompletionAction>();
    }

    @Override
    public boolean isFailurePort(String portName) {
        return this.failurePortNames.contains(portName);
    }

    @Override
    public boolean isTerminalPort(Connectable connectable) {
        if (connectable == null) {
            return false;
        }
        if (connectable.getConnectableType() != ConnectableType.OUTPUT_PORT) {
            return false;
        }
        return connectable.getProcessGroup() == this.rootGroup;
    }

    @Override
    public synchronized void registerCreatedSession(StatelessProcessSession session) {
        this.createdSessions.add(session);
    }

    private synchronized void rollbackActiveSessions() {
        for (StatelessProcessSession session : this.createdSessions) {
            try {
                session.rollback();
            }
            catch (TerminatedTaskException terminatedTaskException) {}
        }
        this.createdSessions.clear();
    }

    @Override
    public boolean isCanceled() {
        if (this.canceled) {
            return true;
        }
        boolean aborted = this.triggerContext.isAbort();
        if (aborted) {
            this.notifyExecutionCanceled();
            return true;
        }
        return false;
    }

    @Override
    public boolean isDataQueued() {
        for (FlowFileQueue queue : this.internalFlowFileQueues) {
            if (queue.isActiveQueueEmpty() && !queue.isUnacknowledgedFlowFile()) continue;
            return true;
        }
        return false;
    }

    @Override
    public ExecutionProgress.CompletionAction awaitCompletionAction() throws InterruptedException {
        ExecutionProgress.CompletionAction completionAction;
        if (this.isCanceled()) {
            return ExecutionProgress.CompletionAction.CANCEL;
        }
        ExecutionProgress.CompletionAction existingAction = this.getExistingCompletionAction();
        if (existingAction != null) {
            return existingAction;
        }
        TriggerResult triggerResult = this.createResult(null, null);
        this.resultQueue.offer(triggerResult);
        this.completionAction = completionAction = this.completionActionQueue.take();
        return completionAction;
    }

    @Override
    public void enqueueTriggerResult(Runnable onAcknowledge, Consumer<Throwable> onFailure) {
        if (this.isCanceled()) {
            onFailure.accept(new RuntimeException("Dataflow canceled"));
            return;
        }
        ExecutionProgress.CompletionAction existingAction = this.getExistingCompletionAction();
        if (existingAction != null) {
            onAcknowledge.run();
            return;
        }
        TriggerResult triggerResult = this.createResult(onAcknowledge, onFailure);
        this.resultQueue.offer(triggerResult);
    }

    private synchronized ExecutionProgress.CompletionAction getExistingCompletionAction() {
        ExecutionProgress.CompletionAction existingAction = this.completionAction;
        if (existingAction != null) {
            return existingAction;
        }
        ExecutionProgress.CompletionAction enqueuedAction = (ExecutionProgress.CompletionAction)((Object)this.completionActionQueue.poll());
        if (enqueuedAction != null) {
            this.completionAction = enqueuedAction;
            return enqueuedAction;
        }
        return null;
    }

    private TriggerResult createResult(final Runnable onAcknowledge, final Consumer<Throwable> onFailure) {
        final Map<String, List<FlowFile>> outputFlowFiles = this.drainOutputQueues();
        for (String failurePortName : this.failurePortNames) {
            List<FlowFile> flowFilesForPort = outputFlowFiles.get(failurePortName);
            if (flowFilesForPort == null || flowFilesForPort.isEmpty()) continue;
            throw new FailurePortEncounteredException("FlowFile was transferred to Port " + failurePortName + ", which is marked as a Failure Port", failurePortName);
        }
        final boolean canceled = this.isCanceled();
        return new TriggerResult(){
            private volatile Throwable abortCause = null;

            public boolean isSuccessful() {
                return this.abortCause == null;
            }

            public boolean isCanceled() {
                return canceled;
            }

            public Optional<Throwable> getFailureCause() {
                return Optional.ofNullable(this.abortCause);
            }

            public Map<String, List<FlowFile>> getOutputFlowFiles() {
                return outputFlowFiles;
            }

            public List<FlowFile> getOutputFlowFiles(String portName) {
                return outputFlowFiles.computeIfAbsent(portName, name -> Collections.emptyList());
            }

            public InputStream readContent(FlowFile flowFile) throws IOException {
                if (!(flowFile instanceof FlowFileRecord)) {
                    throw new IllegalArgumentException("FlowFile was not created by this flow");
                }
                FlowFileRecord flowFileRecord = (FlowFileRecord)flowFile;
                ContentClaim contentClaim = flowFileRecord.getContentClaim();
                InputStream in = StandardExecutionProgress.this.contentRepository.read(contentClaim);
                long offset = flowFileRecord.getContentClaimOffset();
                if (offset > 0L) {
                    StreamUtils.skip((InputStream)in, (long)offset);
                }
                return new LimitedInputStream(in, flowFile.getSize());
            }

            public byte[] readContentAsByteArray(FlowFile flowFile) throws IOException {
                if (!(flowFile instanceof FlowFileRecord)) {
                    throw new IllegalArgumentException("FlowFile was not created by this flow");
                }
                if (flowFile.getSize() > Integer.MAX_VALUE) {
                    throw new IOException("Cannot return contents of " + String.valueOf(flowFile) + " as a byte array because the contents exceed the maximum length supported for byte arrays (2147483647 bytes)");
                }
                FlowFileRecord flowFileRecord = (FlowFileRecord)flowFile;
                long size = flowFileRecord.getSize();
                byte[] flowFileContents = new byte[(int)size];
                try (InputStream in = this.readContent(flowFile);){
                    StreamUtils.fillBuffer((InputStream)in, (byte[])flowFileContents);
                }
                return flowFileContents;
            }

            public void acknowledge() {
                StandardExecutionProgress.this.commitTracker.triggerCallbacks();
                StandardExecutionProgress.this.stateManagerProvider.commitUpdates();
                StandardExecutionProgress.this.completionActionQueue.offer(ExecutionProgress.CompletionAction.COMPLETE);
                StandardExecutionProgress.this.contentRepository.purge();
                if (onAcknowledge != null) {
                    onAcknowledge.run();
                }
            }

            public void abort(Throwable cause) {
                this.abortCause = new DataflowAbortedException("Dataflow was aborted", cause);
                StandardExecutionProgress.this.notifyExecutionFailed(this.abortCause);
                if (onFailure != null) {
                    onFailure.accept(cause);
                }
            }
        };
    }

    @Override
    public void notifyExecutionCanceled() {
        if (this.canceled || this.failed) {
            return;
        }
        this.canceled = true;
        this.commitTracker.triggerFailureCallbacks(new RuntimeException("Dataflow Canceled"));
        this.stateManagerProvider.rollbackUpdates();
        this.completionActionQueue.offer(ExecutionProgress.CompletionAction.CANCEL);
        this.rollbackActiveSessions();
        this.purgeAction.purge();
        this.resultQueue.offer(new CanceledTriggerResult());
    }

    @Override
    public void notifyExecutionFailed(Throwable cause) {
        if (this.canceled || this.failed) {
            return;
        }
        this.failed = true;
        this.commitTracker.triggerFailureCallbacks(cause);
        this.stateManagerProvider.rollbackUpdates();
        this.completionActionQueue.offer(ExecutionProgress.CompletionAction.CANCEL);
        this.rollbackActiveSessions();
        this.purgeAction.purge();
        this.resultQueue.offer(new ExceptionalTriggerResult(cause));
    }

    public Map<String, List<FlowFile>> drainOutputQueues() {
        HashMap<String, List<FlowFile>> flowFileMap = new HashMap<String, List<FlowFile>>();
        for (Port port : this.rootGroup.getOutputPorts()) {
            List<FlowFile> flowFiles = this.drainOutputQueues(port);
            flowFileMap.put(port.getName(), flowFiles);
        }
        return flowFileMap;
    }

    private List<FlowFile> drainOutputQueues(Port port) {
        List incomingConnections = port.getIncomingConnections();
        if (incomingConnections.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<FlowFile> portFlowFiles = new ArrayList<FlowFile>();
        for (Connection connection : incomingConnections) {
            FlowFileQueue flowFileQueue = connection.getFlowFileQueue();
            if (!(flowFileQueue instanceof DrainableFlowFileQueue)) continue;
            DrainableFlowFileQueue drainableQueue = (DrainableFlowFileQueue)connection.getFlowFileQueue();
            ArrayList<FlowFileRecord> flowFileRecords = new ArrayList<FlowFileRecord>(drainableQueue.size().getObjectCount());
            drainableQueue.drainTo(flowFileRecords);
            portFlowFiles.addAll(flowFileRecords);
            for (FlowFileRecord flowFileRecord : flowFileRecords) {
                this.contentRepository.decrementClaimantCount(flowFileRecord.getContentClaim());
            }
        }
        return portFlowFiles;
    }
}

