/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.groups;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.groups.DataValve;
import org.apache.nifi.groups.DataValveDiagnostics;
import org.apache.nifi.groups.FlowFileConcurrency;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardDataValve
implements DataValve {
    private static final Logger logger = LoggerFactory.getLogger(StandardDataValve.class);
    private static final String GROUPS_WITH_DATA_FLOWING_IN_STATE_KEY = "groupsWithDataFlowingIn";
    private static final String GROUPS_WITH_DATA_FLOWING_OUT_STATE_KEY = "groupsWithDataFlowingOut";
    private final ProcessGroup processGroup;
    private final StateManager stateManager;
    private final Set<String> groupsWithDataFlowingIn = new HashSet<String>();
    private final Set<String> groupsWithDataFlowingOut = new HashSet<String>();
    private boolean leftOpenDueToDataQueued = false;

    public StandardDataValve(ProcessGroup processGroup, StateManager stateManager) {
        this.processGroup = processGroup;
        this.stateManager = stateManager;
        this.recoverState();
    }

    public synchronized boolean tryOpenFlowIntoGroup(ProcessGroup destinationGroup) {
        boolean flowingIn = this.groupsWithDataFlowingIn.contains(destinationGroup.getIdentifier());
        if (flowingIn) {
            logger.debug("Allowing data to flow into {} because valve is already open", (Object)destinationGroup);
            return true;
        }
        FlowInForbiddenReason reasonForNotAllowing = this.getReasonFlowIntoGroupNotAllowed(destinationGroup);
        if (reasonForNotAllowing == FlowInForbiddenReason.OPEN_FOR_OUTPUT && this.leftOpenDueToDataQueued && !destinationGroup.isDataQueued()) {
            this.groupsWithDataFlowingOut.remove(destinationGroup.getIdentifier());
        }
        if (reasonForNotAllowing != null) {
            return false;
        }
        logger.debug("Opening valve to allow data to flow into {}", (Object)destinationGroup);
        this.groupsWithDataFlowingIn.add(destinationGroup.getIdentifier());
        this.storeState();
        return true;
    }

    private FlowInForbiddenReason getReasonFlowIntoGroupNotAllowed(ProcessGroup destinationGroup) {
        if (destinationGroup.isDataQueued()) {
            logger.trace("Will not allow data to flow into {} because valve is not already open and the Process Group has data queued", (Object)destinationGroup);
            return FlowInForbiddenReason.DATA_QUEUED;
        }
        if (destinationGroup.getFlowFileOutboundPolicy() == FlowFileOutboundPolicy.BATCH_OUTPUT && this.groupsWithDataFlowingOut.contains(destinationGroup.getIdentifier())) {
            logger.trace("Will not allow data to flow into {} because Outbound Policy is Batch Output and valve is already open to allow data to flow out of group", (Object)destinationGroup);
            return FlowInForbiddenReason.OPEN_FOR_OUTPUT;
        }
        for (Port port : destinationGroup.getInputPorts()) {
            for (Connection connection : port.getIncomingConnections()) {
                boolean flowingOutOfSourceGroup;
                ProcessGroup sourceGroup;
                Connectable sourceConnectable = connection.getSource();
                if (sourceConnectable.getConnectableType() != ConnectableType.OUTPUT_PORT || (sourceGroup = sourceConnectable.getProcessGroup()).getFlowFileOutboundPolicy() != FlowFileOutboundPolicy.BATCH_OUTPUT || !Boolean.TRUE.equals(flowingOutOfSourceGroup = this.groupsWithDataFlowingOut.contains(sourceGroup.getIdentifier()))) continue;
                logger.trace("Will not allow data to flow into {} because port {} has an incoming connection from {} and that Process Group is currently allowing data to flow out", new Object[]{destinationGroup, port, sourceConnectable});
                return FlowInForbiddenReason.SOURCE_FLOWING_OUT;
            }
        }
        return null;
    }

    public synchronized void closeFlowIntoGroup(ProcessGroup destinationGroup) {
        if (!this.groupsWithDataFlowingIn.contains(destinationGroup.getIdentifier())) {
            return;
        }
        if (destinationGroup.getFlowFileConcurrency() == FlowFileConcurrency.SINGLE_BATCH_PER_NODE) {
            for (Port port : destinationGroup.getInputPorts()) {
                for (Connection connection : port.getIncomingConnections()) {
                    if (connection.getFlowFileQueue().isEmpty()) continue;
                    logger.debug("Triggered to close flow of data into group {} but Input Port has incoming Connection {}, which is not empty, so will not close valve", (Object)destinationGroup, (Object)connection);
                    return;
                }
            }
        }
        logger.debug("Closed valve so that data can no longer flow into {}", (Object)destinationGroup);
        this.storeState();
        this.groupsWithDataFlowingIn.remove(destinationGroup.getIdentifier());
    }

    public synchronized boolean tryOpenFlowOutOfGroup(ProcessGroup sourceGroup) {
        boolean flowingOut = this.groupsWithDataFlowingOut.contains(sourceGroup.getIdentifier());
        if (flowingOut) {
            logger.debug("Allowing data to flow out of {} because valve is already open", (Object)sourceGroup);
            return true;
        }
        String reasonNotAllowedToFlowIn = this.getReasonFlowOutOfGroupNotAllowed(sourceGroup);
        if (reasonNotAllowedToFlowIn != null) {
            return false;
        }
        logger.debug("Opening valve to allow data to flow out of {}", (Object)sourceGroup);
        this.groupsWithDataFlowingOut.add(sourceGroup.getIdentifier());
        this.storeState();
        this.leftOpenDueToDataQueued = false;
        return true;
    }

    private String getReasonFlowOutOfGroupNotAllowed(ProcessGroup sourceGroup) {
        for (Port port : sourceGroup.getOutputPorts()) {
            for (Connection connection : port.getConnections()) {
                ProcessGroup destinationProcessGroup;
                Connectable destinationConnectable = connection.getDestination();
                if (destinationConnectable.getConnectableType() != ConnectableType.INPUT_PORT || (destinationProcessGroup = destinationConnectable.getProcessGroup()).getFlowFileConcurrency() != FlowFileConcurrency.SINGLE_BATCH_PER_NODE) continue;
                if (!connection.getFlowFileQueue().isEmpty()) {
                    logger.trace("Not allowing data to flow out of {} because {} has a destination of {}, which has data queued and its Process Group is configured with a FlowFileConcurrency of Batch Per Node.", new Object[]{sourceGroup, port, connection});
                    return "Output Connection already has data queued";
                }
                boolean dataFlowingIntoDestination = this.groupsWithDataFlowingIn.contains(destinationProcessGroup.getIdentifier());
                if (!dataFlowingIntoDestination) continue;
                logger.trace("Not allowing data to flow out of {} because {} has a destination of {}, and its Process Group is currently allowing data to flow in", new Object[]{sourceGroup, port, connection});
                return "Destination Process Group is allowing data to flow in";
            }
        }
        return null;
    }

    public synchronized void closeFlowOutOfGroup(ProcessGroup sourceGroup) {
        if (!this.groupsWithDataFlowingOut.contains(sourceGroup.getIdentifier())) {
            return;
        }
        boolean dataQueued = sourceGroup.isDataQueued();
        if (dataQueued) {
            logger.debug("Triggered to close flow of data out of group {} but group is not empty so will not close valve", (Object)sourceGroup);
            this.leftOpenDueToDataQueued = true;
            return;
        }
        logger.debug("Closed valve so that data can no longer flow out of {}", (Object)sourceGroup);
        this.groupsWithDataFlowingOut.remove(sourceGroup.getIdentifier());
        this.storeState();
    }

    public synchronized DataValveDiagnostics getDiagnostics() {
        final Set dataFlowingIn = this.groupsWithDataFlowingIn.stream().map(arg_0 -> ((ProcessGroup)this.processGroup).getProcessGroup(arg_0)).collect(Collectors.toSet());
        final Set dataFlowingOut = this.groupsWithDataFlowingOut.stream().map(arg_0 -> ((ProcessGroup)this.processGroup).getProcessGroup(arg_0)).collect(Collectors.toSet());
        final HashMap<String, List> reasonInputNotAllowed = new HashMap<String, List>();
        final HashMap<String, List> reasonOutputNotAllowed = new HashMap<String, List>();
        for (ProcessGroup group : this.processGroup.getProcessGroups()) {
            List groupsAffected;
            if (group.getFlowFileConcurrency() == FlowFileConcurrency.SINGLE_BATCH_PER_NODE) {
                FlowInForbiddenReason forbiddenReason = this.getReasonFlowIntoGroupNotAllowed(group);
                String inputReason = forbiddenReason == null ? "Input is Allowed" : forbiddenReason.getExplanation();
                List inputGroupsAffected = reasonInputNotAllowed.computeIfAbsent(inputReason, k -> new ArrayList());
                inputGroupsAffected.add(group);
            } else {
                groupsAffected = reasonInputNotAllowed.computeIfAbsent("FlowFile Concurrency is " + String.valueOf(group.getFlowFileConcurrency()), k -> new ArrayList());
                groupsAffected.add(group);
            }
            if (group.getFlowFileOutboundPolicy() == FlowFileOutboundPolicy.BATCH_OUTPUT) {
                String outputReason = this.getReasonFlowOutOfGroupNotAllowed(group);
                if (outputReason == null) {
                    outputReason = "Output is Allowed";
                }
                List outputGroupsAffected = reasonOutputNotAllowed.computeIfAbsent(outputReason, k -> new ArrayList());
                outputGroupsAffected.add(group);
                continue;
            }
            groupsAffected = reasonOutputNotAllowed.computeIfAbsent("FlowFile Outbound Policy is " + String.valueOf(group.getFlowFileOutboundPolicy()), k -> new ArrayList());
            groupsAffected.add(group);
        }
        return new DataValveDiagnostics(){

            public Set<ProcessGroup> getGroupsWithDataFlowingIn() {
                return dataFlowingIn;
            }

            public Set<ProcessGroup> getGroupsWithDataFlowingOut() {
                return dataFlowingOut;
            }

            public Map<String, List<ProcessGroup>> getReasonForInputNotAllowed() {
                return reasonInputNotAllowed;
            }

            public Map<String, List<ProcessGroup>> getReasonForOutputNotAllowed() {
                return reasonOutputNotAllowed;
            }
        };
    }

    private synchronized void recoverState() {
        StateMap stateMap;
        try {
            stateMap = this.stateManager.getState(Scope.LOCAL);
        }
        catch (Exception e) {
            logger.error("Failed to recover state for {}. This could result in Process Groups configured with a FlowFile Concurrency of SINGLE_BATCH_PER_NODE to get data from multiple batches concurrently or stop ingesting data", (Object)this, (Object)e);
            return;
        }
        if (stateMap.getStateVersion().isEmpty()) {
            logger.debug("No state to recover for {}", (Object)this);
            return;
        }
        List<String> dataFlowingInIds = this.getIdsForKey(stateMap, GROUPS_WITH_DATA_FLOWING_IN_STATE_KEY);
        List<String> dataFlowingOutIds = this.getIdsForKey(stateMap, GROUPS_WITH_DATA_FLOWING_OUT_STATE_KEY);
        logger.debug("Recovered state for {}; {} Process Groups have data flowing in ({}); {} Process Groups have data flowing out ({})", new Object[]{this, dataFlowingInIds.size(), dataFlowingInIds, dataFlowingOutIds.size(), dataFlowingOutIds});
        this.groupsWithDataFlowingIn.addAll(dataFlowingInIds);
        this.groupsWithDataFlowingOut.addAll(dataFlowingOutIds);
    }

    private List<String> getIdsForKey(StateMap stateMap, String key) {
        String concatenated = stateMap.get(key);
        if (concatenated == null || concatenated.isEmpty()) {
            return Collections.emptyList();
        }
        String[] split = concatenated.split(",");
        return Arrays.asList(split);
    }

    private void storeState() {
        String dataFlowingIn = StringUtils.join(this.groupsWithDataFlowingIn, (String)",");
        String dataFlowingOut = StringUtils.join(this.groupsWithDataFlowingOut, (String)",");
        HashMap<String, String> stateValues = new HashMap<String, String>();
        stateValues.put(GROUPS_WITH_DATA_FLOWING_IN_STATE_KEY, dataFlowingIn);
        stateValues.put(GROUPS_WITH_DATA_FLOWING_OUT_STATE_KEY, dataFlowingOut);
        try {
            this.stateManager.setState(stateValues, Scope.LOCAL);
        }
        catch (Exception e) {
            logger.error("Failed to store state for {}. If NiFi is restarted before state is properly stored, this could result Process Groups configured with a FlowFile Concurrency of SINGLE_BATCH_PER_NODE to get data from multiple batches concurrently or stop ingesting data", (Object)this, (Object)e);
        }
    }

    public String toString() {
        return "StandardDataValve[group=" + String.valueOf(this.processGroup) + "]";
    }

    public static enum FlowInForbiddenReason {
        DATA_QUEUED("Process Group already has data queued and valve is not already open to allow data to flow in"),
        OPEN_FOR_OUTPUT("Data Valve is already open to allow data to flow out of group"),
        SOURCE_FLOWING_OUT("Port has an incoming connection from a Process Group that is currently allowing data to flow out");

        private final String explanation;

        private FlowInForbiddenReason(String explanation) {
            this.explanation = explanation;
        }

        public String getExplanation() {
            return this.explanation;
        }
    }
}

