/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.stateless.flow;

import java.io.IOException;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.ConnectionUtils;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.controller.repository.metrics.tracking.StatsTracker;
import org.apache.nifi.controller.repository.metrics.tracking.TrackedStats;
import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.scheduling.LifecycleStateManager;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.stateless.engine.ExecutionProgress;
import org.apache.nifi.stateless.engine.ProcessContextFactory;
import org.apache.nifi.stateless.flow.FlowFileSupplier;
import org.apache.nifi.stateless.flow.StatelessFlowCurrent;
import org.apache.nifi.stateless.flow.TransactionThresholdMeter;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import org.apache.nifi.stateless.session.AsynchronousCommitTracker;
import org.apache.nifi.stateless.session.StatelessProcessSessionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardStatelessFlowCurrent
implements StatelessFlowCurrent {
    private static final Logger logger = LoggerFactory.getLogger(StandardStatelessFlowCurrent.class);
    private final TransactionThresholdMeter transactionThresholdMeter;
    private final AsynchronousCommitTracker commitTracker;
    private final ExecutionProgress executionProgress;
    private final Set<Connectable> rootConnectables;
    private final FlowFileSupplier flowFileSupplier;
    private final ProvenanceEventRepository provenanceEventRepository;
    private final Collection<Port> inputPorts;
    private final RepositoryContextFactory repositoryContextFactory;
    private final ProcessContextFactory processContextFactory;
    private final LifecycleStateManager lifecycleStateManager;
    private final StatsTracker statsTracker;

    private StandardStatelessFlowCurrent(Builder builder) {
        this.transactionThresholdMeter = builder.transactionThresholdMeter;
        this.commitTracker = builder.commitTracker;
        this.executionProgress = builder.executionProgress;
        this.rootConnectables = builder.rootConnectables;
        this.flowFileSupplier = builder.flowFileSupplier;
        this.provenanceEventRepository = builder.provenanceEventRepository;
        this.inputPorts = builder.inputPorts;
        this.repositoryContextFactory = builder.repositoryContextFactory;
        this.processContextFactory = builder.processContextFactory;
        this.lifecycleStateManager = builder.lifecycleStateManager;
        this.statsTracker = builder.statsTracker;
    }

    @Override
    public void triggerFlow() {
        try {
            boolean completionReached = false;
            while (!completionReached) {
                this.triggerRootConnectables();
                while (this.commitTracker.isAnyReady()) {
                    Connectable connectable = this.commitTracker.getNextReady();
                    logger.debug("The next ready component to be triggered: {}", (Object)connectable);
                    NextConnectable nextConnectable = this.triggerWhileReady(connectable);
                    if (nextConnectable == NextConnectable.NONE) {
                        return;
                    }
                    if (nextConnectable == NextConnectable.NEXT_READY) continue;
                }
                completionReached = !this.commitTracker.isAnyReady() && this.isFlowQueueEmpty();
            }
        }
        catch (Throwable t) {
            this.executionProgress.notifyExecutionFailed(t);
            this.commitTracker.triggerFailureCallbacks(t);
            throw t;
        }
    }

    private boolean isFlowQueueEmpty() {
        if (this.executionProgress.isDataQueued()) {
            return false;
        }
        for (Connectable rootConnectable : this.rootConnectables) {
            for (Connection connection : rootConnectable.getIncomingConnections()) {
                if (!connection.getFlowFileQueue().isUnacknowledgedFlowFile()) continue;
                return false;
            }
        }
        return true;
    }

    private void triggerRootConnectables() {
        boolean transactionThresholdsMet = this.transactionThresholdMeter.isThresholdMet();
        boolean flowFileSupplied = transactionThresholdsMet ? false : this.triggerFlowFileSupplier();
        if (!flowFileSupplied) {
            boolean allYielded = true;
            for (Connectable connectable : this.rootConnectables) {
                if (transactionThresholdsMet && !connectable.hasIncomingConnection() || this.isYielded(connectable)) continue;
                allYielded = false;
                this.triggerRootConnectable(connectable);
            }
            if (allYielded) {
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private boolean isYielded(Connectable connectable) {
        long yieldExpiration = connectable.getYieldExpiration();
        if (yieldExpiration > 0L) {
            return yieldExpiration > System.currentTimeMillis();
        }
        return false;
    }

    private boolean triggerFlowFileSupplier() {
        if (this.flowFileSupplier == null) {
            return false;
        }
        boolean flowFileSupplied = false;
        for (Port inputPort : this.inputPorts) {
            Optional flowFileOptional;
            Set outputConnections = inputPort.getConnections();
            if (outputConnections.isEmpty() || (flowFileOptional = this.flowFileSupplier.getFlowFile(inputPort.getName())).isEmpty()) continue;
            flowFileSupplied = true;
            FlowFileRecord flowFile = (FlowFileRecord)flowFileOptional.get();
            ConnectionUtils.FlowFileCloneResult cloneResult = ConnectionUtils.clone((FlowFileRecord)flowFile, (Collection)outputConnections, (FlowFileRepository)this.repositoryContextFactory.getFlowFileRepository(), (ContentRepository)this.repositoryContextFactory.getContentRepository());
            cloneResult.distributeFlowFiles();
            for (Connection connection : outputConnections) {
                this.commitTracker.addConnectable(connection.getDestination());
            }
        }
        return flowFileSupplied;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void triggerRootConnectable(Connectable connectable) {
        LifecycleState lifecycleState = this.lifecycleStateManager.getOrRegisterLifecycleState(connectable.getIdentifier(), true, false);
        this.commitTracker.resetProgress();
        TrackedStats trackedStats = this.statsTracker.startTracking();
        StatelessProcessSessionFactory statelessSessionFactory = new StatelessProcessSessionFactory(connectable, this.repositoryContextFactory, this.provenanceEventRepository, this.processContextFactory, this.executionProgress, false, this.commitTracker, trackedStats.getPerformanceTracker());
        lifecycleState.incrementActiveThreadCount(null);
        try {
            this.trigger(connectable, statelessSessionFactory, trackedStats);
        }
        finally {
            lifecycleState.decrementActiveThreadCount();
            this.registerProcessEvent(connectable, 1, trackedStats);
        }
        this.transactionThresholdMeter.incrementFlowFiles(this.commitTracker.getFlowFilesProduced());
        this.transactionThresholdMeter.incrementBytes(this.commitTracker.getBytesProduced());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private NextConnectable triggerWhileReady(Connectable connectable) {
        LifecycleState lifecycleState = this.lifecycleStateManager.getOrRegisterLifecycleState(connectable.getIdentifier(), true, false);
        TrackedStats trackedStats = this.statsTracker.startTracking();
        StatelessProcessSessionFactory statelessSessionFactory = new StatelessProcessSessionFactory(connectable, this.repositoryContextFactory, this.provenanceEventRepository, this.processContextFactory, this.executionProgress, false, this.commitTracker, trackedStats.getPerformanceTracker());
        lifecycleState.incrementActiveThreadCount(null);
        try {
            while (this.commitTracker.isReady(connectable)) {
                if (this.executionProgress.isCanceled()) {
                    logger.info("Dataflow was canceled so will not trigger any more components");
                    NextConnectable nextConnectable = NextConnectable.NONE;
                    return nextConnectable;
                }
                this.commitTracker.resetProgress();
                this.trigger(connectable, statelessSessionFactory, trackedStats);
                boolean progressed = this.commitTracker.isProgress();
                if (progressed) {
                    logger.debug("{} was triggered and made progress", (Object)connectable);
                    continue;
                }
                if (connectable.getConnectableType() == ConnectableType.OUTPUT_PORT && connectable.getProcessGroup().getFlowFileOutboundPolicy() == FlowFileOutboundPolicy.BATCH_OUTPUT && connectable.getProcessGroup().isDataQueuedForProcessing()) {
                    logger.debug("{} was triggered but unable to make process. Data is still available for processing, so continue triggering components within the Process Group", (Object)connectable);
                    NextConnectable nextConnectable = NextConnectable.NEXT_READY;
                    return nextConnectable;
                }
                boolean thresholdMet = this.transactionThresholdMeter.isThresholdMet();
                if (thresholdMet) {
                    logger.debug("{} was triggered but unable to make progress. The transaction thresholds {} have been met (currently at {}). Will not trigger source components to run.", new Object[]{connectable, this.transactionThresholdMeter.getThresholds(), this.transactionThresholdMeter});
                    continue;
                }
                logger.debug("{} was triggered but unable to make progress. Maximum transaction thresholds {} have not been reached (currently at {}) so will trigger source components to run.", new Object[]{connectable, this.transactionThresholdMeter.getThresholds(), this.transactionThresholdMeter});
                NextConnectable nextConnectable = NextConnectable.SOURCE_CONNECTABLE;
                return nextConnectable;
            }
            NextConnectable nextConnectable = NextConnectable.NEXT_READY;
            return nextConnectable;
        }
        finally {
            lifecycleState.decrementActiveThreadCount();
            this.registerProcessEvent(connectable, 1, trackedStats);
        }
    }

    private void trigger(Connectable connectable, ProcessSessionFactory sessionFactory, TrackedStats trackedStats) {
        ProcessContext processContext = this.processContextFactory.createProcessContext(connectable);
        logger.debug("Triggering {}", (Object)connectable);
        connectable.onTrigger(processContext, sessionFactory);
    }

    private void registerProcessEvent(Connectable connectable, int invocations, TrackedStats trackedStats) {
        try {
            StandardFlowFileEvent procEvent = trackedStats.end();
            procEvent.setInvocations(invocations);
            this.repositoryContextFactory.getFlowFileEventRepository().updateRepository((FlowFileEvent)procEvent, connectable.getIdentifier());
        }
        catch (IOException e) {
            logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", new Object[]{connectable.getRunnableComponent(), e, e});
        }
    }

    public static class Builder {
        private TransactionThresholdMeter transactionThresholdMeter;
        private AsynchronousCommitTracker commitTracker;
        private ExecutionProgress executionProgress;
        private Set<Connectable> rootConnectables;
        private Collection<Port> inputPorts;
        private FlowFileSupplier flowFileSupplier = null;
        private ProvenanceEventRepository provenanceEventRepository;
        private RepositoryContextFactory repositoryContextFactory;
        private ProcessContextFactory processContextFactory;
        private LifecycleStateManager lifecycleStateManager;
        private StatsTracker statsTracker;

        public StandardStatelessFlowCurrent build() {
            Objects.requireNonNull(this.transactionThresholdMeter, "Transaction Threshold Meter must be set");
            Objects.requireNonNull(this.commitTracker, "Commit Tracker must be set");
            Objects.requireNonNull(this.executionProgress, "Execution Progress must be set");
            Objects.requireNonNull(this.rootConnectables, "Root Conectables must be set");
            Objects.requireNonNull(this.repositoryContextFactory, "Repository Context Factory must be set");
            Objects.requireNonNull(this.provenanceEventRepository, "Provenance Event Repository must be set");
            Objects.requireNonNull(this.processContextFactory, "Process Context Factory must be set");
            Objects.requireNonNull(this.statsTracker, "Stats Tracker must be set");
            return new StandardStatelessFlowCurrent(this);
        }

        public Builder transactionThresholdMeter(TransactionThresholdMeter transactionThresholdMeter) {
            this.transactionThresholdMeter = transactionThresholdMeter;
            return this;
        }

        public Builder lifecycleStateManager(LifecycleStateManager lifecycleStateManager) {
            this.lifecycleStateManager = lifecycleStateManager;
            return this;
        }

        public Builder commitTracker(AsynchronousCommitTracker commitTracker) {
            this.commitTracker = commitTracker;
            return this;
        }

        public Builder statsTracker(StatsTracker statsTracker) {
            this.statsTracker = statsTracker;
            return this;
        }

        public Builder executionProgress(ExecutionProgress executionProgress) {
            this.executionProgress = executionProgress;
            return this;
        }

        public Builder rootConnectables(Set<Connectable> rootConnectables) {
            this.rootConnectables = rootConnectables;
            return this;
        }

        public Builder flowFileSupplier(FlowFileSupplier flowFileSupplier) {
            this.flowFileSupplier = flowFileSupplier;
            return this;
        }

        public Builder provenanceEventRepository(ProvenanceEventRepository provenanceEventRepository) {
            this.provenanceEventRepository = provenanceEventRepository;
            return this;
        }

        public Builder inputPorts(Collection<Port> inputPorts) {
            this.inputPorts = inputPorts;
            return this;
        }

        public Builder repositoryContextFactory(RepositoryContextFactory repositoryContextFactory) {
            this.repositoryContextFactory = repositoryContextFactory;
            return this;
        }

        public Builder processContextFactory(ProcessContextFactory processContextFactory) {
            this.processContextFactory = processContextFactory;
            return this;
        }
    }

    private static enum NextConnectable {
        NEXT_READY,
        SOURCE_CONNECTABLE,
        NONE;

    }
}

