/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.stateless.flow;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.NumberFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StatelessStateManagerProvider;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.LocalPort;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.AbstractComponentNode;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.controller.repository.metrics.tracking.StatsTracker;
import org.apache.nifi.controller.scheduling.LifecycleStateManager;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.controller.state.StandardStateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.TerminatedTaskException;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.stateless.engine.ExecutionProgress;
import org.apache.nifi.stateless.engine.ProcessContextFactory;
import org.apache.nifi.stateless.engine.StandardExecutionProgress;
import org.apache.nifi.stateless.flow.CachingProcessContextFactory;
import org.apache.nifi.stateless.flow.CanceledTriggerResult;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.DataflowTriggerContext;
import org.apache.nifi.stateless.flow.ExceptionalTriggerResult;
import org.apache.nifi.stateless.flow.StandardStatelessDataflowValidation;
import org.apache.nifi.stateless.flow.StandardStatelessFlowCurrent;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.StatelessDataflowInitializationContext;
import org.apache.nifi.stateless.flow.StatelessDataflowValidation;
import org.apache.nifi.stateless.flow.TransactionThresholdMeter;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.apache.nifi.stateless.queue.DrainableFlowFileQueue;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import org.apache.nifi.stateless.repository.StatelessProvenanceRepository;
import org.apache.nifi.stateless.session.AsynchronousCommitTracker;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.Connectables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardStatelessFlow
implements StatelessDataflow {
    private static final Logger logger = LoggerFactory.getLogger(StandardStatelessFlow.class);
    private static final long TEN_MILLIS_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(10L);
    private static final String PARENT_FLOW_GROUP_ID = "stateless-flow";
    private final ProcessGroup rootGroup;
    private final List<Connection> allConnections;
    private final List<ReportingTaskNode> reportingTasks;
    private final Set<Connectable> rootConnectables;
    private final Map<String, Port> inputPortsByName;
    private final ControllerServiceProvider controllerServiceProvider;
    private final ProcessContextFactory processContextFactory;
    private final RepositoryContextFactory repositoryContextFactory;
    private final List<FlowFileQueue> internalFlowFileQueues;
    private final DataflowDefinition dataflowDefinition;
    private final StatelessStateManagerProvider stateManagerProvider;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final ProcessScheduler processScheduler;
    private final AsynchronousCommitTracker tracker;
    private final TransactionThresholdMeter transactionThresholdMeter;
    private final List<BackgroundTask> backgroundTasks = new ArrayList<BackgroundTask>();
    private final BulletinRepository bulletinRepository;
    private final LifecycleStateManager lifecycleStateManager;
    private final long componentEnableTimeoutMillis;
    private final List<Port> inputPorts;
    private final StatsTracker statsTracker;
    private volatile ExecutorService runDataflowExecutor;
    private volatile ScheduledExecutorService backgroundTaskExecutor;
    private volatile boolean initialized = false;
    private volatile Boolean stateful = null;
    private volatile boolean shutdown = false;
    private volatile boolean manageControllerServices = true;

    public StandardStatelessFlow(ProcessGroup rootGroup, List<ReportingTaskNode> reportingTasks, ControllerServiceProvider controllerServiceProvider, ProcessContextFactory processContextFactory, RepositoryContextFactory repositoryContextFactory, DataflowDefinition dataflowDefinition, StatelessStateManagerProvider stateManagerProvider, ProcessScheduler processScheduler, BulletinRepository bulletinRepository, LifecycleStateManager lifecycleStateManager, Duration componentEnableTimeout, StatsTracker statsTracker) {
        this(rootGroup, reportingTasks, controllerServiceProvider, processContextFactory, repositoryContextFactory, dataflowDefinition, stateManagerProvider, processScheduler, bulletinRepository, lifecycleStateManager, componentEnableTimeout, statsTracker, new AsynchronousCommitTracker(rootGroup));
    }

    public StandardStatelessFlow(ProcessGroup rootGroup, List<ReportingTaskNode> reportingTasks, ControllerServiceProvider controllerServiceProvider, ProcessContextFactory processContextFactory, RepositoryContextFactory repositoryContextFactory, DataflowDefinition dataflowDefinition, StatelessStateManagerProvider stateManagerProvider, ProcessScheduler processScheduler, BulletinRepository bulletinRepository, LifecycleStateManager lifecycleStateManager, Duration componentEnableTimeout, StatsTracker statsTracker, AsynchronousCommitTracker commitTracker) {
        this.rootGroup = rootGroup;
        this.allConnections = rootGroup.findAllConnections();
        this.reportingTasks = reportingTasks;
        this.controllerServiceProvider = controllerServiceProvider;
        this.processContextFactory = new CachingProcessContextFactory(processContextFactory);
        this.repositoryContextFactory = repositoryContextFactory;
        this.dataflowDefinition = dataflowDefinition;
        this.stateManagerProvider = stateManagerProvider;
        this.processScheduler = processScheduler;
        this.transactionThresholdMeter = new TransactionThresholdMeter(dataflowDefinition.getTransactionThresholds());
        this.bulletinRepository = bulletinRepository;
        this.componentEnableTimeoutMillis = componentEnableTimeout.toMillis();
        this.tracker = commitTracker;
        this.lifecycleStateManager = lifecycleStateManager;
        this.inputPorts = new ArrayList<Port>(rootGroup.getInputPorts());
        this.statsTracker = statsTracker;
        this.rootConnectables = new HashSet<Connectable>();
        this.inputPortsByName = this.mapInputPortsToName(rootGroup);
        this.discoverRootProcessors(rootGroup, this.rootConnectables);
        this.discoverRootRemoteGroupPorts(rootGroup, this.rootConnectables);
        this.discoverRootInputPorts(rootGroup, this.rootConnectables);
        this.internalFlowFileQueues = this.discoverInternalFlowFileQueues(rootGroup);
    }

    private List<FlowFileQueue> discoverInternalFlowFileQueues(ProcessGroup group) {
        Set rootGroupInputPorts = this.rootGroup.getInputPorts();
        Set rootGroupOutputPorts = this.rootGroup.getOutputPorts();
        return group.findAllConnections().stream().filter(connection -> !rootGroupInputPorts.contains(connection.getSource())).filter(connection -> !rootGroupOutputPorts.contains(connection.getDestination())).map(Connection::getFlowFileQueue).distinct().collect(Collectors.toCollection(ArrayList::new));
    }

    private Map<String, Port> mapInputPortsToName(ProcessGroup group) {
        HashMap<String, Port> inputPortsByName = new HashMap<String, Port>();
        for (Port port : group.getInputPorts()) {
            inputPortsByName.put(port.getName(), port);
        }
        return inputPortsByName;
    }

    private void discoverRootInputPorts(ProcessGroup processGroup, Set<Connectable> rootComponents) {
        for (Port port : processGroup.getInputPorts()) {
            for (Connection connection : port.getConnections()) {
                Connectable connectable = connection.getDestination();
                if (StandardStatelessFlow.isTerminalPort(connectable)) continue;
                rootComponents.add(connectable);
            }
        }
    }

    private void discoverRootProcessors(ProcessGroup processGroup, Set<Connectable> rootComponents) {
        for (ProcessorNode processor : processGroup.findAllProcessors()) {
            if (Connectables.hasNonLoopConnection((Connectable)processor)) continue;
            rootComponents.add((Connectable)processor);
        }
    }

    private void discoverRootRemoteGroupPorts(ProcessGroup processGroup, Set<Connectable> rootComponents) {
        List rpgs = processGroup.findAllRemoteProcessGroups();
        for (RemoteProcessGroup rpg : rpgs) {
            Set remoteGroupPorts = rpg.getOutputPorts();
            for (RemoteGroupPort remoteGroupPort : remoteGroupPorts) {
                if (remoteGroupPort.getConnections().isEmpty()) continue;
                rootComponents.add((Connectable)remoteGroupPort);
            }
        }
    }

    public static boolean isTerminalPort(Connectable connectable) {
        ConnectableType connectableType = connectable.getConnectableType();
        if (connectableType != ConnectableType.OUTPUT_PORT) {
            return false;
        }
        ProcessGroup portGroup = connectable.getProcessGroup();
        if (PARENT_FLOW_GROUP_ID.equals(portGroup.getIdentifier())) {
            logger.debug("FlowFiles queued for {} but this is a Terminal Port. Will not trigger Port to run.", (Object)connectable);
            return true;
        }
        return false;
    }

    public void initialize(StatelessDataflowInitializationContext initializationContext) {
        if (this.initialized) {
            logger.debug("{} initialize() was called, but dataflow has already been initialized. Returning without doing anything.", (Object)this);
            return;
        }
        this.initialized = true;
        this.manageControllerServices = initializationContext.isEnableControllerServices();
        this.performValidation();
        try {
            long serviceEnableStart = System.currentTimeMillis();
            if (initializationContext.isEnableControllerServices()) {
                this.enableControllerServices(this.rootGroup);
                this.waitForServicesEnabled(this.rootGroup);
            } else {
                logger.debug("Skipping Controller Service enablement because initializationContext.isEnableControllerServices() returned false");
            }
            long serviceEnableMillis = System.currentTimeMillis() - serviceEnableStart;
            long validationStart = System.currentTimeMillis();
            StatelessDataflowValidation validationResult = this.performValidation();
            long validationMillis = System.currentTimeMillis() - validationStart;
            if (!validationResult.isValid()) {
                logger.warn("{} Attempting to initialize dataflow but found at least one invalid component: {}", (Object)this, (Object)validationResult);
            }
            this.startProcessors(this.rootGroup);
            this.startRemoteGroups(this.rootGroup);
            this.startReportingTasks();
            long initializationMillis = System.currentTimeMillis() - validationStart;
            logger.info("Successfully initialized components in {} millis ({} millis to perform validation, {} millis for services to enable)", new Object[]{initializationMillis, validationMillis, serviceEnableMillis});
            String flowName = this.dataflowDefinition.getFlowName();
            String threadName = flowName == null || flowName.trim().isEmpty() ? "Run Dataflow" : "Run Dataflow " + flowName;
            this.runDataflowExecutor = Executors.newFixedThreadPool(1, this.createNamedThreadFactory(threadName, false));
            this.backgroundTaskExecutor = Executors.newScheduledThreadPool(1, this.createNamedThreadFactory("Background Tasks", true));
            this.backgroundTasks.forEach(task -> this.backgroundTaskExecutor.scheduleWithFixedDelay(task.getTask(), task.getSchedulingPeriod(), task.getSchedulingPeriod(), task.getSchedulingUnit()));
        }
        catch (Throwable t) {
            this.processScheduler.shutdown();
            if (this.runDataflowExecutor != null) {
                this.runDataflowExecutor.shutdownNow();
            }
            if (this.backgroundTaskExecutor != null) {
                this.backgroundTaskExecutor.shutdownNow();
            }
            throw t;
        }
    }

    private ThreadFactory createNamedThreadFactory(String name, boolean daemon) {
        return r -> {
            Thread thread = Executors.defaultThreadFactory().newThread(r);
            thread.setName(name);
            thread.setDaemon(daemon);
            return thread;
        };
    }

    public void scheduleBackgroundTask(Runnable task, long period, TimeUnit unit) {
        this.backgroundTasks.add(new BackgroundTask(task, period, unit));
    }

    private void waitForServicesEnabled(ProcessGroup group) {
        long startTime = System.currentTimeMillis();
        long cutoff = startTime + this.componentEnableTimeoutMillis;
        Set serviceNodes = group.findAllControllerServices();
        for (ControllerServiceNode serviceNode : serviceNodes) {
            boolean enabled;
            try {
                enabled = serviceNode.awaitEnabled(cutoff - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted while waiting for Controller Services to enable", ie);
            }
            if (enabled || System.currentTimeMillis() <= cutoff) continue;
            String validationErrors = this.performValidation().toString();
            throw new IllegalStateException("At least one Controller Service never finished enabling. All validation errors: " + validationErrors);
        }
    }

    private void startReportingTasks() {
        this.reportingTasks.forEach(this::startReportingTask);
    }

    private void startReportingTask(ReportingTaskNode taskNode) {
        this.processScheduler.schedule(taskNode);
    }

    public void shutdown(boolean triggerComponentShutdown, boolean interruptProcessors, Duration gracefulShutdownDuration) {
        if (this.shutdown) {
            return;
        }
        this.shutdown = true;
        logger.info("Shutting down dataflow {}", (Object)this.rootGroup.getName());
        if (this.backgroundTaskExecutor != null) {
            this.backgroundTaskExecutor.shutdown();
        }
        logger.info("Stopping all components");
        HashSet<ProcessorNode> runningProcessors = new HashSet<ProcessorNode>(this.rootGroup.findAllProcessors());
        this.unscheduleProcessors(runningProcessors);
        boolean interrupt = false;
        if (interruptProcessors) {
            if (gracefulShutdownDuration.isZero()) {
                logger.info("Shutting down all components immediately without waiting for graceful shutdown period");
                this.tracker.triggerFailureCallbacks(new TerminatedTaskException());
                interrupt = true;
            } else {
                boolean gracefullyStopped = this.waitForProcessorThreadsToComplete(runningProcessors, gracefulShutdownDuration);
                if (gracefullyStopped) {
                    if (this.rootGroup.isDataQueuedForProcessing()) {
                        int queuedCount = this.allConnections.stream().mapToInt(conn -> conn.getFlowFileQueue().size().getObjectCount()).sum();
                        logger.warn("All Processors finished running but {} FlowFiles remain queued; treating session as failure", (Object)queuedCount);
                        this.tracker.triggerFailureCallbacks(new TerminatedTaskException());
                    } else {
                        logger.info("All Processors have finished running; triggering session callbacks");
                        this.tracker.triggerCallbacks();
                    }
                } else {
                    logger.warn("{} Processors did not finish running within the graceful shutdown period of {} millis. Interrupting all running components. Processors still running: {}", new Object[]{runningProcessors.size(), gracefulShutdownDuration.toMillis(), runningProcessors});
                    this.tracker.triggerFailureCallbacks(new TerminatedTaskException());
                    interrupt = true;
                }
            }
        } else {
            this.waitForProcessorThreadsToComplete(runningProcessors, gracefulShutdownDuration);
            if (this.rootGroup.isDataQueuedForProcessing()) {
                int queuedCount = this.allConnections.stream().mapToInt(conn -> conn.getFlowFileQueue().size().getObjectCount()).sum();
                logger.warn("{} FlowFiles remain queued after shutdown; treating session as failure", (Object)queuedCount);
                this.tracker.triggerFailureCallbacks(new TerminatedTaskException());
            } else {
                this.tracker.triggerCallbacks();
            }
        }
        if (this.runDataflowExecutor != null) {
            if (interrupt) {
                this.runDataflowExecutor.shutdownNow();
            } else {
                this.runDataflowExecutor.shutdown();
            }
        }
        CompletableFuture stopFuture = this.rootGroup.stopComponents(ProcessorStopLifecycleMethods.TRIGGER_ONSTOPPED);
        try {
            stopFuture.get(gracefulShutdownDuration.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        catch (Exception ie) {
            // empty catch block
        }
        for (ProcessorNode processorNode : this.rootGroup.findAllProcessors()) {
            this.processScheduler.terminateProcessor(processorNode);
        }
        this.rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::shutdown);
        if (triggerComponentShutdown) {
            this.rootGroup.shutdown();
        }
        this.reportingTasks.forEach(arg_0 -> ((ProcessScheduler)this.processScheduler).unschedule(arg_0));
        Set allControllerServices = this.rootGroup.findAllControllerServices();
        if (this.manageControllerServices) {
            logger.info("Disabling {} Controller Services", (Object)allControllerServices.size());
            if (!allControllerServices.isEmpty()) {
                try {
                    this.controllerServiceProvider.disableControllerServicesAsync((Collection)allControllerServices).get();
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    logger.error("Failed to properly disable one or more of the following Controller Services due to being interrupted while waiting for them to disable: {}", (Object)allControllerServices, (Object)ie);
                }
                catch (Exception e) {
                    logger.error("Failed to properly disable one or more of the following Controller Services: {}", (Object)allControllerServices, (Object)e);
                }
                logger.info("Finished disabling all Controller Services");
            }
        }
        this.stateManagerProvider.shutdown();
        if (triggerComponentShutdown) {
            if (this.manageControllerServices) {
                allControllerServices.forEach(cs -> this.processScheduler.shutdownControllerService(cs, this.controllerServiceProvider));
            }
            this.reportingTasks.forEach(arg_0 -> ((ProcessScheduler)this.processScheduler).shutdownReportingTask(arg_0));
        }
        this.processScheduler.shutdown();
        this.repositoryContextFactory.shutdown();
        logger.info("Finished shutting down dataflow");
    }

    private void unscheduleProcessors(Set<ProcessorNode> runningProcessors) {
        for (ProcessorNode processor : runningProcessors) {
            if (!processor.isRunning()) continue;
            ProcessContext processContext = this.processContextFactory.createProcessContext((Connectable)processor);
            processor.triggerOnUnscheduled(processContext);
        }
    }

    private boolean waitForProcessorThreadsToComplete(Set<ProcessorNode> runningProcessors, Duration gracefulShutdownDuration) {
        long maxEndTime = System.currentTimeMillis() + gracefulShutdownDuration.toMillis();
        while (!runningProcessors.isEmpty()) {
            List<ProcessorNode> stopped = runningProcessors.stream().filter(proc -> proc.getActiveThreadCount() == 0).toList();
            stopped.forEach(runningProcessors::remove);
            if (runningProcessors.isEmpty()) {
                return true;
            }
            if (System.currentTimeMillis() > maxEndTime) {
                return false;
            }
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                return false;
            }
        }
        return true;
    }

    public StatelessDataflowValidation performValidation() {
        HashMap<ComponentNode, List<ValidationResult>> resultsMap = new HashMap<ComponentNode, List<ValidationResult>>();
        for (ControllerServiceNode serviceNode : this.rootGroup.findAllControllerServices()) {
            this.performValidation((ComponentNode)serviceNode, resultsMap);
        }
        for (ProcessorNode procNode : this.rootGroup.findAllProcessors()) {
            this.performValidation((ComponentNode)procNode, resultsMap);
        }
        return new StandardStatelessDataflowValidation(resultsMap);
    }

    private void performValidation(ComponentNode componentNode, Map<ComponentNode, List<ValidationResult>> resultsMap) {
        ValidationStatus validationStatus = componentNode.performValidation();
        if (validationStatus == ValidationStatus.VALID) {
            return;
        }
        Collection validationResults = componentNode.getValidationErrors();
        ArrayList<ValidationResult> invalidResults = new ArrayList<ValidationResult>();
        for (ValidationResult result : validationResults) {
            if (result.isValid()) continue;
            invalidResults.add(result);
        }
        resultsMap.put(componentNode, invalidResults);
    }

    private void enableControllerServices(ProcessGroup processGroup) {
        Set services = processGroup.getControllerServices(false);
        Future future = this.controllerServiceProvider.enableControllerServicesAsync((Collection)services);
        try {
            future.get(this.componentEnableTimeoutMillis, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            StringBuilder validationMessage = new StringBuilder("The following Controller Services have not fully enabled:\n");
            for (ControllerServiceNode serviceNode : services) {
                if (serviceNode.getState() == ControllerServiceState.ENABLED) continue;
                validationMessage.append("Controller Service ").append(serviceNode).append(" has Validation Status ").append(serviceNode.getValidationStatus()).append(" with validation Errors: ").append(serviceNode.getValidationErrors()).append("\n");
            }
            throw new IllegalStateException(validationMessage.toString().trim(), e);
        }
        processGroup.getProcessGroups().forEach(this::enableControllerServices);
    }

    private void startProcessors(ProcessGroup processGroup) {
        Collection processors = processGroup.getProcessors();
        HashMap<ProcessorNode, Future> futures = new HashMap<ProcessorNode, Future>(processors.size());
        for (ProcessorNode processorNode : processors) {
            Future future = processGroup.startProcessor(processorNode, true);
            futures.put(processorNode, future);
        }
        for (Map.Entry entry : futures.entrySet()) {
            ProcessorNode processor = (ProcessorNode)entry.getKey();
            Future future = (Future)entry.getValue();
            long start = System.currentTimeMillis();
            try {
                future.get(this.componentEnableTimeoutMillis, TimeUnit.MILLISECONDS);
            }
            catch (Exception e) {
                StatelessDataflowValidation validation = this.performValidation();
                if (validation.isValid()) {
                    throw new IllegalStateException("Processor " + String.valueOf(processor) + " is valid but has not fully started", e);
                }
                String validationErrors = this.performValidation().toString();
                throw new IllegalStateException("Processor " + String.valueOf(processor) + " has not fully started. Current Validation Status is " + String.valueOf(processor.getValidationStatus()) + ". All validation errors: " + validationErrors);
            }
            long millis = System.currentTimeMillis() - start;
            logger.debug("Waited {} millis for {} to start", (Object)millis, (Object)processor);
        }
        processGroup.getProcessGroups().forEach(this::startProcessors);
    }

    private void startRemoteGroups(ProcessGroup processGroup) {
        List rpgs = processGroup.findAllRemoteProcessGroups();
        rpgs.forEach(RemoteProcessGroup::initialize);
        rpgs.forEach(RemoteProcessGroup::startTransmitting);
    }

    public DataflowTrigger trigger(DataflowTriggerContext triggerContext) {
        if (!this.initialized) {
            throw new IllegalStateException("Must initialize dataflow before triggering it");
        }
        final LinkedBlockingQueue<TriggerResult> resultQueue = new LinkedBlockingQueue<TriggerResult>();
        final StandardExecutionProgress executionProgress = new StandardExecutionProgress(this.rootGroup, this.internalFlowFileQueues, resultQueue, this.repositoryContextFactory, this.dataflowDefinition.getFailurePortNames(), this.tracker, this.stateManagerProvider, triggerContext, this::purge);
        final Future<?> future = this.runDataflowExecutor.submit(() -> this.executeDataflow(resultQueue, executionProgress, this.tracker, triggerContext));
        DataflowTrigger trigger = new DataflowTrigger(){

            public void cancel() {
                executionProgress.notifyExecutionCanceled();
                future.cancel(true);
            }

            public Optional<TriggerResult> getResultNow() {
                TriggerResult result = (TriggerResult)resultQueue.poll();
                return Optional.ofNullable(result);
            }

            public Optional<TriggerResult> getResult(long maxWaitTime, TimeUnit timeUnit) throws InterruptedException {
                TriggerResult result = (TriggerResult)resultQueue.poll(maxWaitTime, timeUnit);
                return Optional.ofNullable(result);
            }

            public TriggerResult getResult() throws InterruptedException {
                TriggerResult result = (TriggerResult)resultQueue.take();
                return result;
            }
        };
        return trigger;
    }

    private void executeDataflow(BlockingQueue<TriggerResult> resultQueue, ExecutionProgress executionProgress, AsynchronousCommitTracker tracker, DataflowTriggerContext triggerContext) {
        long startNanos = System.nanoTime();
        this.transactionThresholdMeter.reset();
        StandardStatelessFlowCurrent current = new StandardStatelessFlowCurrent.Builder().commitTracker(tracker).statsTracker(this.statsTracker).executionProgress(executionProgress).processContextFactory(this.processContextFactory).repositoryContextFactory(this.repositoryContextFactory).rootConnectables(this.rootConnectables).flowFileSupplier(triggerContext.getFlowFileSupplier()).provenanceEventRepository(triggerContext.getProvenanceEventRepository()).inputPorts(this.inputPorts).transactionThresholdMeter(this.transactionThresholdMeter).lifecycleStateManager(this.lifecycleStateManager).build();
        Runnable logCompletion = () -> {
            if (logger.isDebugEnabled()) {
                long nanos = System.nanoTime() - startNanos;
                String prettyPrinted = nanos > TEN_MILLIS_IN_NANOS ? TimeUnit.NANOSECONDS.toMillis(nanos) + " millis" : NumberFormat.getInstance().format(nanos) + " nanos";
                logger.debug("Ran dataflow in {}", (Object)prettyPrinted);
            }
        };
        try {
            current.triggerFlow();
            executionProgress.enqueueTriggerResult(logCompletion, cause -> logger.error("Failed to execute dataflow", cause));
            logger.debug("Completed triggering of components in dataflow. Will not wait for acknowledgment as the invocation is asynchronous.");
        }
        catch (TerminatedTaskException tte) {
            logger.debug("Caught a TerminatedTaskException", (Throwable)tte);
            executionProgress.notifyExecutionFailed(tte);
            resultQueue.offer(new CanceledTriggerResult());
        }
        catch (Throwable t) {
            logger.error("Failed to execute dataflow", t);
            executionProgress.notifyExecutionFailed(t);
            resultQueue.offer(new ExceptionalTriggerResult(t));
        }
    }

    public boolean isStateful() {
        if (this.stateful == null) {
            boolean hasStatefulReportingTask = this.reportingTasks.stream().anyMatch(this::isStateful);
            if (hasStatefulReportingTask) {
                return true;
            }
            this.stateful = this.isStateful(this.rootGroup);
        }
        return this.stateful;
    }

    private boolean isStateful(ProcessGroup processGroup) {
        boolean hasStatefulProcessor = processGroup.getProcessors().stream().anyMatch(this::isStateful);
        if (hasStatefulProcessor) {
            return true;
        }
        boolean hasStatefulControllerService = processGroup.getControllerServices(false).stream().anyMatch(this::isStateful);
        if (hasStatefulControllerService) {
            return true;
        }
        return processGroup.getProcessGroups().stream().anyMatch(this::isStateful);
    }

    private boolean isStateful(ProcessorNode processorNode) {
        Processor processor = processorNode.getProcessor();
        ProcessContext context = this.processContextFactory.createProcessContext((Connectable)processorNode);
        return processor.isStateful(context);
    }

    private boolean isStateful(ControllerServiceNode controllerServiceNode) {
        ControllerService controllerService = controllerServiceNode.getControllerServiceImplementation();
        StandardConfigurationContext context = new StandardConfigurationContext((ComponentNode)controllerServiceNode, (ControllerServiceLookup)this.controllerServiceProvider, null);
        return controllerService.isStateful((ConfigurationContext)context);
    }

    private boolean isStateful(ReportingTaskNode reportingTaskNode) {
        ReportingTask reportingTask = reportingTaskNode.getReportingTask();
        return reportingTask.isStateful(reportingTaskNode.getReportingContext());
    }

    public Set<String> getInputPortNames() {
        return this.inputPortsByName.keySet();
    }

    public Set<String> getOutputPortNames() {
        return this.rootGroup.getOutputPorts().stream().map(Connectable::getName).collect(Collectors.toSet());
    }

    public QueueSize enqueue(byte[] flowFileContents, Map<String, String> attributes, String portName) {
        QueueSize queueSize;
        ByteArrayInputStream bais = new ByteArrayInputStream(flowFileContents);
        try {
            queueSize = this.enqueue(bais, attributes, portName);
        }
        catch (Throwable throwable) {
            try {
                try {
                    ((InputStream)bais).close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (IOException e) {
                throw new FlowFileAccessException("Failed to enqueue FlowFile", (Throwable)e);
            }
        }
        ((InputStream)bais).close();
        return queueSize;
    }

    public QueueSize enqueue(InputStream flowFileContents, Map<String, String> attributes, String portName) {
        Port inputPort = this.rootGroup.getInputPortByName(portName);
        if (inputPort == null) {
            throw new IllegalArgumentException("No Input Port exists with name <" + portName + ">. Valid Port names are " + String.valueOf(this.getInputPortNames()));
        }
        RepositoryContext repositoryContext = this.repositoryContextFactory.createRepositoryContext((Connectable)inputPort, (ProvenanceEventRepository)new StatelessProvenanceRepository(10));
        StandardProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(repositoryContext, () -> false, (PerformanceTracker)new NopPerformanceTracker());
        ProcessSession session = sessionFactory.createSession();
        try {
            Set portConnections = inputPort.getConnections();
            if (portConnections.isEmpty()) {
                throw new IllegalStateException("Cannot enqueue data for Input Port <" + portName + "> because it has no outgoing connections");
            }
            FlowFile flowFile = session.create();
            flowFile = session.write(flowFile, out -> StreamUtils.copy((InputStream)flowFileContents, (OutputStream)out));
            flowFile = session.putAllAttributes(flowFile, attributes);
            session.transfer(flowFile, LocalPort.PORT_RELATIONSHIP);
            session.commitAsync();
            Connection firstConnection = (Connection)portConnections.iterator().next();
            return firstConnection.getFlowFileQueue().size();
        }
        catch (Throwable t) {
            session.rollback();
            throw t;
        }
    }

    public boolean isFlowFileQueued() {
        for (Connection connection : this.allConnections) {
            if (connection.getFlowFileQueue().isActiveQueueEmpty()) continue;
            return true;
        }
        return false;
    }

    public void purge() {
        ArrayList<FlowFileRecord> flowFiles = new ArrayList<FlowFileRecord>();
        for (Connection connection : this.allConnections) {
            try {
                FlowFileQueue queue = connection.getFlowFileQueue();
                ((DrainableFlowFileQueue)queue).drainTo(flowFiles);
                ArrayList<StandardRepositoryRecord> repositoryRecords = new ArrayList<StandardRepositoryRecord>();
                for (FlowFileRecord flowFile : flowFiles) {
                    StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFile);
                    record.markForDelete();
                    repositoryRecords.add(record);
                }
                this.repositoryContextFactory.getFlowFileRepository().updateRepository(repositoryRecords);
            }
            catch (Exception e) {
                logger.warn("Failed to update FlowFile Repository in order to notify it of transient claims. Some content in the Content Repository may not be cleaned up until restart", (Throwable)e);
            }
            flowFiles.clear();
        }
        this.repositoryContextFactory.getContentRepository().purge();
    }

    public Map<String, String> getComponentStates(Scope scope) {
        Map<String, StateMap> stateMaps = this.stateManagerProvider.getAllComponentStates(scope);
        Map<String, String> componentStates = this.serializeStateMaps(stateMaps);
        return componentStates;
    }

    private Map<String, String> serializeStateMaps(Map<String, StateMap> stateMaps) {
        if (stateMaps == null) {
            return Collections.emptyMap();
        }
        HashMap<String, String> serializedStateMaps = new HashMap<String, String>();
        for (Map.Entry<String, StateMap> entry : stateMaps.entrySet()) {
            String serialized;
            String componentId = entry.getKey();
            StateMap stateMap = entry.getValue();
            if (!stateMap.getStateVersion().isPresent()) continue;
            SerializableStateMap serializableStateMap = new SerializableStateMap();
            serializableStateMap.setStateValues(stateMap.toMap());
            serializableStateMap.setVersion(stateMap.getStateVersion().orElse(null));
            try {
                serialized = this.objectMapper.writeValueAsString((Object)serializableStateMap);
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to serialize components' state maps as Strings", e);
            }
            serializedStateMaps.put(componentId, serialized);
        }
        return serializedStateMaps;
    }

    public void setComponentStates(Map<String, String> componentStates, Scope scope) {
        Map<String, StateMap> stateMaps = this.deserializeStateMaps(componentStates);
        this.stateManagerProvider.updateComponentsStates(stateMaps, scope);
    }

    private Map<String, StateMap> deserializeStateMaps(Map<String, String> componentStates) {
        if (componentStates == null) {
            return Collections.emptyMap();
        }
        HashMap<String, StateMap> deserializedStateMaps = new HashMap<String, StateMap>();
        for (Map.Entry<String, String> entry : componentStates.entrySet()) {
            SerializableStateMap deserialized;
            String componentId = entry.getKey();
            String serialized = entry.getValue();
            try {
                deserialized = (SerializableStateMap)this.objectMapper.readValue(serialized, SerializableStateMap.class);
            }
            catch (Exception e) {
                logger.error("Failed to deserialized components' state for component with ID {}. State will be reset to empty", (Object)componentId, (Object)e);
                continue;
            }
            StandardStateMap stateMap = new StandardStateMap(deserialized.getStateValues(), Optional.ofNullable(deserialized.getVersion()));
            deserializedStateMaps.put(componentId, (StateMap)stateMap);
        }
        return deserializedStateMaps;
    }

    public BulletinRepository getBulletinRepository() {
        return this.bulletinRepository;
    }

    public OptionalLong getCounter(String componentId, String counterName) {
        String instanceId = this.findInstanceId(componentId);
        return this.findCounter(counter -> counter.getContext().endsWith(" (" + instanceId + ")") && counter.getName().equals(counterName));
    }

    public Map<String, Long> getCounters(Pattern counterNamePattern) {
        CounterRepository counterRepository = this.repositoryContextFactory.getCounterRepository();
        return counterRepository.getCounters().stream().filter(counter -> !counter.getContext().startsWith("All ") && counterNamePattern.matcher(counter.getName()).matches()).collect(Collectors.toMap(Counter::getName, Counter::getValue));
    }

    private String findInstanceId(String componentId) {
        return this.rootGroup.findAllProcessors().stream().filter(processor -> Objects.equals(processor.getIdentifier(), componentId) || Objects.equals(processor.getVersionedComponentId().orElse(""), componentId)).findFirst().map(AbstractComponentNode::getIdentifier).orElse(null);
    }

    private OptionalLong findCounter(Predicate<Counter> predicate) {
        CounterRepository counterRepository = this.repositoryContextFactory.getCounterRepository();
        return counterRepository.getCounters().stream().filter(predicate).mapToLong(Counter::getValue).findFirst();
    }

    public Set<Processor> findAllProcessors() {
        return this.rootGroup.findAllProcessors().stream().map(ProcessorNode::getProcessor).collect(Collectors.toSet());
    }

    public ContentRepository getContentRepository() {
        return this.repositoryContextFactory.getContentRepository();
    }

    private static class BackgroundTask {
        private final Runnable task;
        private final long schedulingPeriod;
        private final TimeUnit schedulingUnit;

        public BackgroundTask(Runnable task, long schedulingPeriod, TimeUnit schedulingUnit) {
            this.task = task;
            this.schedulingPeriod = schedulingPeriod;
            this.schedulingUnit = schedulingUnit;
        }

        public Runnable getTask() {
            return this.task;
        }

        public long getSchedulingPeriod() {
            return this.schedulingPeriod;
        }

        public TimeUnit getSchedulingUnit() {
            return this.schedulingUnit;
        }
    }

    private static class SerializableStateMap {
        private String version;
        private Map<String, String> stateValues;

        private SerializableStateMap() {
        }

        public String getVersion() {
            return this.version;
        }

        public void setVersion(String version) {
            this.version = version;
        }

        public Map<String, String> getStateValues() {
            return this.stateValues;
        }

        public void setStateValues(Map<String, String> stateValues) {
            this.stateValues = stateValues;
        }
    }
}

