/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.stateless.core;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.ConnectableComponent;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery;
import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile;
import org.apache.nifi.stateless.bootstrap.RunnableFlow;
import org.apache.nifi.stateless.core.ComponentFactory;
import org.apache.nifi.stateless.core.RegistryUtil;
import org.apache.nifi.stateless.core.SLF4JComponentLog;
import org.apache.nifi.stateless.core.StatelessComponent;
import org.apache.nifi.stateless.core.StatelessControllerServiceLookup;
import org.apache.nifi.stateless.core.StatelessFlowFile;
import org.apache.nifi.stateless.core.StatelessParameterContext;
import org.apache.nifi.stateless.core.StatelessPassThroughComponent;
import org.apache.nifi.stateless.core.StatelessProcessContext;
import org.apache.nifi.stateless.core.StatelessProcessorWrapper;
import org.apache.nifi.stateless.core.StatelessRemoteInputPort;
import org.apache.nifi.stateless.core.StatelessRemoteOutputPort;
import org.apache.nifi.stateless.core.StatelessStateManager;

public class StatelessFlow
implements RunnableFlow {
    public static final String REGISTRY = "registryUrl";
    public static final String BUCKETID = "bucketId";
    public static final String FLOWID = "flowId";
    public static final String FLOWVERSION = "flowVersion";
    public static final String MATERIALIZECONTENT = "materializeContent";
    public static final String FAILUREPORTS = "failurePortIds";
    public static final String FLOWFILES = "flowFiles";
    public static final String CONTENT = "nifi_content";
    public static final String PARAMETERS = "parameters";
    public static final String PARAMETER_SENSITIVE = "sensitive";
    public static final String PARAMETER_VALUE = "value";
    public static final String SSL = "ssl";
    public static final String KEYSTORE = "keystore";
    public static final String KEYSTORE_PASS = "keystorePass";
    public static final String KEY_PASS = "keyPass";
    public static final String KEYSTORE_TYPE = "keystoreType";
    public static final String TRUSTSTORE = "truststore";
    public static final String TRUSTSTORE_PASS = "truststorePass";
    public static final String TRUSTSTORE_TYPE = "truststoreType";
    private List<StatelessComponent> roots;
    private volatile boolean stopRequested = false;
    private StatelessComponent sourceComponent = null;
    private final ComponentFactory componentFactory;

    public StatelessFlow(StatelessProcessorWrapper root) {
        this(Collections.singletonList(root));
    }

    public StatelessFlow(List<StatelessComponent> roots) {
        this.roots = roots;
        this.componentFactory = null;
    }

    public StatelessFlow(VersionedProcessGroup flow, ExtensionManager extensionManager, VariableRegistry variableRegistry, List<String> failureOutputPorts, boolean materializeContent, SSLContext sslContext, ParameterContext parameterContext) throws ProcessorInstantiationException, InitializationException {
        this.componentFactory = new ComponentFactory(extensionManager);
        Map<String, VersionedProcessor> processors = this.findProcessorsRecursive(flow).stream().collect(Collectors.toMap(VersionedComponent::getIdentifier, proc -> proc));
        HashMap<String, VersionedRemoteProcessGroup> rpgs = new HashMap<String, VersionedRemoteProcessGroup>();
        HashMap<String, VersionedRemoteGroupPort> remotePorts = new HashMap<String, VersionedRemoteGroupPort>();
        this.findRemoteGroupRecursive(flow, rpgs, remotePorts);
        Set<VersionedConnection> connections = this.findConnectionsRecursive(flow);
        Set inputPorts = flow.getInputPorts();
        if (inputPorts.size() > 1) {
            throw new IllegalArgumentException("Only one input port per flow is allowed");
        }
        StatelessControllerServiceLookup serviceLookup = new StatelessControllerServiceLookup(parameterContext);
        Set controllerServices = flow.getControllerServices();
        for (VersionedControllerService versionedControllerService : controllerServices) {
            StatelessStateManager stateManager = new StatelessStateManager();
            ControllerService service = this.componentFactory.createControllerService(versionedControllerService, variableRegistry, serviceLookup, stateManager, (ParameterLookup)parameterContext);
            serviceLookup.addControllerService(service, versionedControllerService.getName());
            serviceLookup.setControllerServiceAnnotationData(service, versionedControllerService.getAnnotationData());
            SLF4JComponentLog logger = new SLF4JComponentLog(service);
            StatelessProcessContext processContext = new StatelessProcessContext((ConfigurableComponent)service, serviceLookup, versionedControllerService.getName(), logger, stateManager, variableRegistry, parameterContext);
            Map versionedPropertyValues = versionedControllerService.getProperties();
            for (Map.Entry entry : versionedPropertyValues.entrySet()) {
                String propertyName = (String)entry.getKey();
                String propertyValue = (String)entry.getValue();
                PropertyDescriptor descriptor = service.getPropertyDescriptor(propertyName);
                serviceLookup.setControllerServiceProperty(service, descriptor, processContext, variableRegistry, propertyValue);
            }
            for (PropertyDescriptor descriptor : service.getPropertyDescriptors()) {
                String versionedPropertyValue = (String)versionedPropertyValues.get(descriptor.getName());
                if (versionedPropertyValue != null || descriptor.getDefaultValue() == null) continue;
                serviceLookup.setControllerServiceProperty(service, descriptor, processContext, variableRegistry, descriptor.getDefaultValue());
            }
        }
        serviceLookup.enableControllerServices(variableRegistry);
        HashMap<String, StatelessComponent> componentMap = new HashMap<String, StatelessComponent>();
        for (VersionedConnection connection : connections) {
            VersionedRemoteProcessGroup rpg;
            VersionedRemoteGroupPort remotePort;
            boolean isInputPortConnection = false;
            ConnectableComponent source = connection.getSource();
            ConnectableComponent destination = connection.getDestination();
            StatelessComponent sourceComponent = null;
            if (componentMap.containsKey(source.getId())) {
                sourceComponent = (StatelessComponent)componentMap.get(source.getId());
            } else {
                switch (source.getType()) {
                    case PROCESSOR: {
                        VersionedProcessor processor = processors.get(source.getId());
                        if (processor == null) {
                            throw new IllegalArgumentException("Unknown input processor. " + source.getId());
                        }
                        sourceComponent = this.componentFactory.createProcessor(processor, materializeContent, serviceLookup, variableRegistry, null, parameterContext);
                        componentMap.put(source.getId(), sourceComponent);
                        break;
                    }
                    case REMOTE_INPUT_PORT: {
                        throw new IllegalArgumentException("Unsupported source type: " + source.getType());
                    }
                    case REMOTE_OUTPUT_PORT: {
                        remotePort = (VersionedRemoteGroupPort)remotePorts.get(source.getId());
                        rpg = (VersionedRemoteProcessGroup)rpgs.get(remotePort.getRemoteGroupId());
                        sourceComponent = new StatelessRemoteOutputPort(rpg, remotePort, sslContext);
                        componentMap.put(source.getId(), sourceComponent);
                        break;
                    }
                    case OUTPUT_PORT: 
                    case FUNNEL: {
                        sourceComponent = new StatelessPassThroughComponent();
                        componentMap.put(source.getId(), sourceComponent);
                        break;
                    }
                    case INPUT_PORT: {
                        if (flow.getIdentifier().equals(connection.getGroupIdentifier())) {
                            isInputPortConnection = true;
                            break;
                        }
                        sourceComponent = new StatelessPassThroughComponent();
                        componentMap.put(source.getId(), sourceComponent);
                    }
                }
            }
            StatelessComponent destinationComponent = null;
            switch (destination.getType()) {
                case PROCESSOR: {
                    if (componentMap.containsKey(destination.getId())) {
                        destinationComponent = (StatelessComponent)componentMap.get(destination.getId());
                        break;
                    }
                    VersionedProcessor processor = processors.get(destination.getId());
                    if (processor == null) {
                        return;
                    }
                    destinationComponent = this.componentFactory.createProcessor(processor, materializeContent, serviceLookup, variableRegistry, null, parameterContext);
                    destinationComponent.addParent(sourceComponent);
                    componentMap.put(destination.getId(), destinationComponent);
                    break;
                }
                case REMOTE_INPUT_PORT: {
                    if (componentMap.containsKey(destination.getId())) {
                        destinationComponent = (StatelessComponent)componentMap.get(destination.getId());
                        break;
                    }
                    remotePort = (VersionedRemoteGroupPort)remotePorts.get(destination.getId());
                    rpg = (VersionedRemoteProcessGroup)rpgs.get(remotePort.getRemoteGroupId());
                    destinationComponent = new StatelessRemoteInputPort(rpg, remotePort, sslContext);
                    destinationComponent.addParent(sourceComponent);
                    componentMap.put(destination.getId(), destinationComponent);
                    break;
                }
                case REMOTE_OUTPUT_PORT: {
                    throw new IllegalArgumentException("Unsupported destination type: " + destination.getType());
                }
                case OUTPUT_PORT: {
                    if (isInputPortConnection) {
                        throw new IllegalArgumentException("Input ports can not be mapped directly to output ports...");
                    }
                    if (flow.getIdentifier().equals(connection.getGroupIdentifier())) {
                        for (String selectedRelationship : connection.getSelectedRelationships()) {
                            Relationship relationship = new Relationship.Builder().name(selectedRelationship).build();
                            boolean failurePort = failureOutputPorts.contains(destination.getId());
                            sourceComponent.addOutputPort(relationship, failurePort);
                        }
                        break;
                    }
                }
                case FUNNEL: 
                case INPUT_PORT: {
                    if (componentMap.containsKey(destination.getId())) {
                        destinationComponent = (StatelessComponent)componentMap.get(destination.getId());
                        break;
                    }
                    destinationComponent = new StatelessPassThroughComponent();
                    componentMap.put(destination.getId(), destinationComponent);
                }
            }
            if (destinationComponent == null) continue;
            destinationComponent.addIncomingConnection(connection.getIdentifier());
            if (isInputPortConnection) {
                this.sourceComponent = destinationComponent;
                continue;
            }
            destinationComponent.addParent(sourceComponent);
            for (String relationship : connection.getSelectedRelationships()) {
                sourceComponent.addChild(destinationComponent, new Relationship.Builder().name(relationship).build());
            }
        }
        this.roots = componentMap.values().stream().filter(statelessComponent -> statelessComponent.getParents().isEmpty()).collect(Collectors.toList());
    }

    private Set<VersionedProcessor> findProcessorsRecursive(VersionedProcessGroup group) {
        HashSet<VersionedProcessor> processors = new HashSet<VersionedProcessor>();
        this.findProcessorsRecursive(group, processors);
        return processors;
    }

    private void findProcessorsRecursive(VersionedProcessGroup group, Set<VersionedProcessor> processors) {
        processors.addAll(group.getProcessors());
        group.getProcessGroups().forEach(child -> this.findProcessorsRecursive((VersionedProcessGroup)child, processors));
    }

    private Set<VersionedConnection> findConnectionsRecursive(VersionedProcessGroup group) {
        HashSet<VersionedConnection> connections = new HashSet<VersionedConnection>();
        this.findConnectionsRecursive(group, connections);
        return connections;
    }

    private void findConnectionsRecursive(VersionedProcessGroup group, Set<VersionedConnection> connections) {
        connections.addAll(group.getConnections());
        group.getProcessGroups().forEach(child -> this.findConnectionsRecursive((VersionedProcessGroup)child, connections));
    }

    private void findRemoteGroupRecursive(VersionedProcessGroup group, Map<String, VersionedRemoteProcessGroup> rpgs, Map<String, VersionedRemoteGroupPort> ports) {
        for (VersionedRemoteProcessGroup rpg : group.getRemoteProcessGroups()) {
            rpgs.put(rpg.getIdentifier(), rpg);
            rpg.getInputPorts().forEach(port -> ports.put(port.getIdentifier(), (VersionedRemoteGroupPort)port));
            rpg.getOutputPorts().forEach(port -> ports.put(port.getIdentifier(), (VersionedRemoteGroupPort)port));
        }
    }

    @Override
    public boolean run(Queue<InMemoryFlowFile> output) {
        while (!this.stopRequested) {
            for (StatelessComponent pw : this.roots) {
                boolean successful = pw.runRecursive(output);
                if (successful) continue;
                return false;
            }
        }
        return true;
    }

    @Override
    public boolean runOnce(Queue<InMemoryFlowFile> output) {
        for (StatelessComponent pw : this.roots) {
            boolean successful = pw.runRecursive(output);
            if (successful) continue;
            return false;
        }
        return true;
    }

    public void shutdown() {
        this.stopRequested = true;
        this.roots.forEach(StatelessComponent::shutdown);
    }

    public static SSLContext getSSLContext(JsonObject config) {
        if (!config.has(SSL)) {
            return null;
        }
        JsonObject sslObject = config.get(SSL).getAsJsonObject();
        if (sslObject.has(KEYSTORE) && sslObject.has(KEYSTORE_PASS) && sslObject.has(KEYSTORE_TYPE) && sslObject.has(TRUSTSTORE) && sslObject.has(TRUSTSTORE_PASS) && sslObject.has(TRUSTSTORE_TYPE)) {
            String keystore = sslObject.get(KEYSTORE).getAsString();
            String keystorePass = sslObject.get(KEYSTORE_PASS).getAsString();
            String keyPass = sslObject.has(KEY_PASS) ? sslObject.get(KEY_PASS).getAsString() : keystorePass;
            String keystoreType = sslObject.get(KEYSTORE_TYPE).getAsString();
            String truststore = sslObject.get(TRUSTSTORE).getAsString();
            String truststorePass = sslObject.get(TRUSTSTORE_PASS).getAsString();
            String truststoreType = sslObject.get(TRUSTSTORE_TYPE).getAsString();
            try {
                return SslContextFactory.createSslContext((String)keystore, (char[])keystorePass.toCharArray(), (char[])keyPass.toCharArray(), (String)keystoreType, (String)truststore, (char[])truststorePass.toCharArray(), (String)truststoreType, (SslContextFactory.ClientAuth)SslContextFactory.ClientAuth.REQUIRED, (String)"TLS");
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to create Keystore", e);
            }
        }
        return null;
    }

    public static StatelessFlow createAndEnqueueFromJSON(JsonObject args, ClassLoader systemClassLoader, File narWorkingDir) throws InitializationException, IOException, ProcessorInstantiationException, NiFiRegistryException {
        if (args == null) {
            throw new IllegalArgumentException("Flow arguments can not be null");
        }
        System.out.println("Running flow from json: " + args.toString());
        if (!(args.has(REGISTRY) && args.has(BUCKETID) && args.has(FLOWID))) {
            throw new IllegalArgumentException("The following parameters must be provided: registryUrl, bucketId, flowId");
        }
        String registryurl = args.getAsJsonPrimitive(REGISTRY).getAsString();
        String bucketID = args.getAsJsonPrimitive(BUCKETID).getAsString();
        String flowID = args.getAsJsonPrimitive(FLOWID).getAsString();
        int flowVersion = -1;
        if (args.has(FLOWVERSION)) {
            flowVersion = args.getAsJsonPrimitive(FLOWVERSION).getAsInt();
        }
        boolean materializeContent = true;
        if (args.has(MATERIALIZECONTENT)) {
            materializeContent = args.getAsJsonPrimitive(MATERIALIZECONTENT).getAsBoolean();
        }
        ArrayList<String> failurePorts = new ArrayList<String>();
        if (args.has(FAILUREPORTS)) {
            args.getAsJsonArray(FAILUREPORTS).forEach(port -> failurePorts.add(port.getAsString()));
        }
        SSLContext sslContext = StatelessFlow.getSSLContext(args);
        VersionedFlowSnapshot snapshot = new RegistryUtil(registryurl, sslContext).getFlowByID(bucketID, flowID, flowVersion);
        HashMap<VariableDescriptor, String> inputVariables = new HashMap<VariableDescriptor, String>();
        VersionedProcessGroup versionedGroup = snapshot.getFlowContents();
        if (versionedGroup != null) {
            for (Map.Entry entry : versionedGroup.getVariables().entrySet()) {
                String variableName = (String)entry.getKey();
                String variableValue = (String)entry.getValue();
                inputVariables.put(new VariableDescriptor(variableName), variableValue);
            }
        }
        HashSet<Parameter> parameters = new HashSet<Parameter>();
        HashSet<String> parameterNames = new HashSet<String>();
        if (args.has(PARAMETERS)) {
            JsonElement parametersElement = args.get(PARAMETERS);
            JsonObject parametersObject = parametersElement.getAsJsonObject();
            for (Map.Entry entry : parametersObject.entrySet()) {
                String parameterName = (String)entry.getKey();
                JsonElement valueElement = (JsonElement)entry.getValue();
                if (parameterNames.contains(parameterName)) {
                    throw new IllegalStateException("Cannot parse configuration because Parameter '" + parameterName + "' has been defined twice");
                }
                parameterNames.add(parameterName);
                if (valueElement.isJsonObject()) {
                    JsonObject valueObject = valueElement.getAsJsonObject();
                    boolean sensitive = valueObject.has(PARAMETER_SENSITIVE) ? valueObject.get(PARAMETER_SENSITIVE).getAsBoolean() : false;
                    if (valueObject.has(PARAMETER_VALUE)) {
                        String value = valueObject.get(PARAMETER_VALUE).getAsString();
                        ParameterDescriptor descriptor = new ParameterDescriptor.Builder().name(parameterName).sensitive(sensitive).build();
                        Parameter parameter = new Parameter(descriptor, value);
                        parameters.add(parameter);
                        continue;
                    }
                    throw new IllegalStateException("Cannot parse configuration because Parameter '" + parameterName + "' does not have a value associated with it");
                }
                String parameterValue = ((JsonElement)entry.getValue()).getAsString();
                ParameterDescriptor descriptor = new ParameterDescriptor.Builder().name(parameterName).build();
                Parameter parameter = new Parameter(descriptor, parameterValue);
                parameters.add(parameter);
            }
        }
        StatelessParameterContext parameterContext = new StatelessParameterContext(parameters);
        ExtensionManager extensionManager = ExtensionDiscovery.discover(narWorkingDir, systemClassLoader);
        StatelessFlow flow = new StatelessFlow(snapshot.getFlowContents(), extensionManager, () -> inputVariables, failurePorts, materializeContent, sslContext, parameterContext);
        flow.enqueueFromJSON(args);
        return flow;
    }

    public void enqueueFlowFile(byte[] content, Map<String, String> attributes) {
        if (this.sourceComponent == null) {
            throw new IllegalArgumentException("Flow does not have an input port...");
        }
        LinkedList<StatelessFlowFile> input = new LinkedList<StatelessFlowFile>();
        input.add(new StatelessFlowFile(content, attributes, this.sourceComponent.isMaterializeContent()));
        this.sourceComponent.enqueueAll(input);
    }

    public void enqueueFromJSON(JsonObject json) {
        if (!json.has(FLOWFILES)) {
            return;
        }
        JsonArray flowFiles = json.getAsJsonArray(FLOWFILES);
        if (flowFiles.size() == 0) {
            return;
        }
        if (this.sourceComponent == null) {
            throw new IllegalStateException("Configuration specifies to inject " + flowFiles.size() + " FlowFiles into the flow, but the Flow does not contain an Input Port.");
        }
        LinkedList<StatelessFlowFile> input = new LinkedList<StatelessFlowFile>();
        flowFiles.forEach(f -> {
            JsonObject file = f.getAsJsonObject();
            String content = file.getAsJsonPrimitive(CONTENT).getAsString();
            HashMap<String, String> attributes = new HashMap<String, String>();
            file.entrySet().forEach(entry -> {
                if (!CONTENT.equals(entry.getKey())) {
                    attributes.put((String)entry.getKey(), ((JsonElement)entry.getValue()).getAsString());
                }
            });
            input.add(new StatelessFlowFile(content, attributes, this.sourceComponent.isMaterializeContent()));
        });
        this.sourceComponent.enqueueAll(input);
    }
}

