/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.py4j;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.py4j.PythonProcess;
import org.apache.nifi.py4j.StandardBoundObjectCounts;
import org.apache.nifi.py4j.logback.LevelChangeListener;
import org.apache.nifi.py4j.logging.LogLevelChangeHandler;
import org.apache.nifi.py4j.logging.StandardLogLevelChangeHandler;
import org.apache.nifi.python.BoundObjectCounts;
import org.apache.nifi.python.ControllerServiceTypeLookup;
import org.apache.nifi.python.PythonBridge;
import org.apache.nifi.python.PythonBridgeInitializationContext;
import org.apache.nifi.python.PythonProcessConfig;
import org.apache.nifi.python.PythonProcessorDetails;
import org.apache.nifi.python.processor.FlowFileSource;
import org.apache.nifi.python.processor.FlowFileSourceProxy;
import org.apache.nifi.python.processor.FlowFileTransform;
import org.apache.nifi.python.processor.FlowFileTransformProxy;
import org.apache.nifi.python.processor.PythonProcessorBridge;
import org.apache.nifi.python.processor.RecordTransform;
import org.apache.nifi.python.processor.RecordTransformProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardPythonBridge
implements PythonBridge {
    private static final Logger logger = LoggerFactory.getLogger(StandardPythonBridge.class);
    private volatile boolean running = false;
    private PythonProcessConfig processConfig;
    private ControllerServiceTypeLookup serviceTypeLookup;
    private Supplier<Set<File>> narDirectoryLookup;
    private PythonProcess controllerProcess;
    private final Map<ExtensionId, Integer> processorCountByType = new ConcurrentHashMap<ExtensionId, Integer>();
    private final Map<ExtensionId, List<PythonProcess>> processesByProcessorType = new ConcurrentHashMap<ExtensionId, List<PythonProcess>>();

    public void initialize(PythonBridgeInitializationContext context) {
        this.processConfig = context.getPythonProcessConfig();
        this.serviceTypeLookup = context.getControllerServiceTypeLookup();
        this.narDirectoryLookup = context.getNarDirectoryLookup();
    }

    public synchronized void start() throws IOException {
        if (this.running) {
            logger.debug("{} already started, will not start again", (Object)this);
            return;
        }
        logger.debug("{} launching Python Process", (Object)this);
        try {
            LogLevelChangeHandler logLevelChangeHandler = StandardLogLevelChangeHandler.getHandler();
            LevelChangeListener.registerLogbackListener(logLevelChangeHandler);
            File envHome = new File(this.processConfig.getPythonWorkingDirectory(), "controller");
            this.controllerProcess = new PythonProcess(this.processConfig, this.serviceTypeLookup, envHome, true, "Controller", "Controller");
            this.controllerProcess.start();
            this.running = true;
        }
        catch (Exception e) {
            this.shutdown();
            throw e;
        }
    }

    public void discoverExtensions(boolean includeNarDirectories) {
        this.ensureStarted();
        List extensionsDirs = this.processConfig.getPythonExtensionsDirectories().stream().map(File::getAbsolutePath).collect(Collectors.toCollection(ArrayList::new));
        if (includeNarDirectories) {
            extensionsDirs.addAll(this.getNarDirectories());
        }
        String workDirPath = this.processConfig.getPythonWorkingDirectory().getAbsolutePath();
        this.controllerProcess.discoverExtensions(extensionsDirs, workDirPath);
    }

    public void discoverExtensions(List<File> extensionDirectories) {
        this.ensureStarted();
        List<String> extensionsDirs = extensionDirectories.stream().map(File::getAbsolutePath).toList();
        String workDirPath = this.processConfig.getPythonWorkingDirectory().getAbsolutePath();
        this.controllerProcess.discoverExtensions(extensionsDirs, workDirPath);
    }

    private PythonProcessorBridge createProcessorBridge(String identifier, String type, String version, boolean preferIsolatedProcess) {
        this.ensureStarted();
        Optional<ExtensionId> extensionIdFound = this.findExtensionId(type, version);
        ExtensionId extensionId = extensionIdFound.orElseThrow(() -> new IllegalArgumentException("Processor Type [%s] Version [%s] not found".formatted(type, version)));
        logger.debug("Creating Python Processor Type [{}] Version [{}]", (Object)extensionId.type(), (Object)extensionId.version());
        PythonProcessorDetails processorDetails = this.getProcessorTypes().stream().filter(details -> details.getProcessorType().equals(type)).filter(details -> details.getProcessorVersion().equals(version)).findFirst().orElseThrow(() -> new IllegalArgumentException("Could not find Processor Details for Python Processor type [%s] or version [%s]".formatted(type, version)));
        String processorHome = processorDetails.getExtensionHome();
        boolean bundledWithDependencies = processorDetails.isBundledWithDependencies();
        PythonProcess pythonProcess = this.getProcessForNextComponent(extensionId, identifier, processorHome, preferIsolatedProcess, bundledWithDependencies);
        String workDirPath = this.processConfig.getPythonWorkingDirectory().getAbsolutePath();
        PythonProcessorBridge processorBridge = pythonProcess.createProcessor(identifier, type, version, workDirPath, preferIsolatedProcess);
        this.processorCountByType.merge(extensionId, 1, Integer::sum);
        return processorBridge;
    }

    public AsyncLoadedProcessor createProcessor(String identifier, String type, String version, boolean preferIsolatedProcess, boolean initialize) {
        PythonProcessorDetails processorDetails = this.getProcessorTypes().stream().filter(details -> details.getProcessorType().equals(type)).filter(details -> details.getProcessorVersion().equals(version)).findFirst().orElseThrow(() -> new IllegalArgumentException("Unknown Python Processor type [%s] or version [%s]".formatted(type, version)));
        String implementedInterface = processorDetails.getInterface();
        Supplier<PythonProcessorBridge> processorBridgeFactory = () -> this.createProcessorBridge(identifier, type, version, preferIsolatedProcess);
        if (FlowFileTransform.class.getName().equals(implementedInterface)) {
            return new FlowFileTransformProxy(type, processorBridgeFactory, initialize);
        }
        if (RecordTransform.class.getName().equals(implementedInterface)) {
            return new RecordTransformProxy(type, processorBridgeFactory, initialize);
        }
        if (FlowFileSource.class.getName().equals(implementedInterface)) {
            return new FlowFileSourceProxy(type, processorBridgeFactory, initialize);
        }
        return null;
    }

    public synchronized void onProcessorRemoved(String identifier, String type, String version) {
        Optional<ExtensionId> extensionIdFound = this.findExtensionId(type, version);
        if (extensionIdFound.isPresent()) {
            ExtensionId extensionId = extensionIdFound.get();
            List<PythonProcess> processes = this.processesByProcessorType.get(extensionId);
            if (processes == null) {
                return;
            }
            Thread.ofVirtual().name("Remove Python Processor " + identifier).start(() -> {
                PythonProcess toRemove = null;
                try {
                    for (PythonProcess process : processes) {
                        boolean removed = process.removeProcessor(identifier);
                        if (!removed || process.getProcessorCount() != 0) continue;
                        toRemove = process;
                        break;
                    }
                    if (toRemove != null) {
                        processes.remove(toRemove);
                        toRemove.shutdown();
                    }
                }
                catch (Exception e) {
                    logger.error("Failed to trigger removal of Python Processor with ID {}", (Object)identifier, (Object)e);
                }
            });
            this.processorCountByType.merge(extensionId, -1, Integer::sum);
        } else {
            logger.debug("Processor Type [{}] Version [{}] not found", (Object)type, (Object)version);
        }
    }

    public int getTotalProcessCount() {
        int count = 0;
        for (List<PythonProcess> processes : this.processesByProcessorType.values()) {
            count += processes.size();
        }
        return count;
    }

    private synchronized PythonProcess getProcessForNextComponent(ExtensionId extensionId, String componentId, String processorHome, boolean preferIsolatedProcess, boolean packagedWithDependencies) {
        int processorsOfThisType = this.processorCountByType.getOrDefault(extensionId, 0);
        int processIndex = processorsOfThisType % this.processConfig.getMaxPythonProcessesPerType();
        List processesForType = this.processesByProcessorType.computeIfAbsent(extensionId, key -> new CopyOnWriteArrayList());
        for (PythonProcess pythonProcess : processesForType) {
            if (preferIsolatedProcess && pythonProcess.containsIsolatedProcessor()) continue;
            logger.debug("Using {} to create Processor of type {}", (Object)pythonProcess, (Object)extensionId.type());
            return pythonProcess;
        }
        if (processesForType.size() <= processIndex) {
            try {
                File envHome;
                int totalProcessCount = this.getTotalProcessCount();
                if (totalProcessCount >= this.processConfig.getMaxPythonProcesses()) {
                    throw new IllegalStateException("Cannot launch new Python Process because the maximum number of processes allowed, according to nifi.properties, is " + this.processConfig.getMaxPythonProcesses() + " and there are currently " + totalProcessCount + " processes active");
                }
                logger.info("In order to create Python Processor of type {}, launching a new Python Process because there are currently {} Python Processors of this type and {} Python Processes", new Object[]{extensionId.type(), processorsOfThisType, this.processesByProcessorType.size()});
                if (packagedWithDependencies) {
                    envHome = new File(processorHome);
                } else {
                    File extensionsWorkDir = new File(this.processConfig.getPythonWorkingDirectory(), "extensions");
                    File componentTypeHome = new File(extensionsWorkDir, extensionId.type());
                    envHome = new File(componentTypeHome, extensionId.version());
                }
                PythonProcess pythonProcess = new PythonProcess(this.processConfig, this.serviceTypeLookup, envHome, packagedWithDependencies, extensionId.type(), componentId);
                pythonProcess.start();
                List extensionsDirs = this.processConfig.getPythonExtensionsDirectories().stream().map(File::getAbsolutePath).collect(Collectors.toCollection(ArrayList::new));
                extensionsDirs.addAll(this.getNarDirectories());
                String workDirPath = this.processConfig.getPythonWorkingDirectory().getAbsolutePath();
                pythonProcess.discoverExtensions(extensionsDirs, workDirPath);
                processesForType.add(pythonProcess);
                return pythonProcess;
            }
            catch (IOException ioe) {
                String message = String.format("Failed to launch Process for Python Processor [%s] Version [%s]", extensionId.type(), extensionId.version());
                throw new RuntimeException(message, ioe);
            }
        }
        PythonProcess pythonProcess = (PythonProcess)processesForType.get(processIndex);
        logger.warn("Using existing process {} to create Processor of type {} because configuration indicates that no more than {} processes should be created for any Processor Type. This may result in slower performance for Processors of this type", new Object[]{pythonProcess, extensionId.type(), this.processConfig.getMaxPythonProcessesPerType()});
        return pythonProcess;
    }

    public List<PythonProcessorDetails> getProcessorTypes() {
        this.ensureStarted();
        return this.controllerProcess.getCurrentController().getProcessorTypes();
    }

    public synchronized Map<String, Integer> getProcessCountsPerType() {
        HashMap<String, Integer> counts = new HashMap<String, Integer>(this.processesByProcessorType.size());
        for (Map.Entry<ExtensionId, List<PythonProcess>> entry : this.processesByProcessorType.entrySet()) {
            counts.put(entry.getKey().type() + " version " + entry.getKey().version(), entry.getValue().size());
        }
        return counts;
    }

    public void removeProcessorType(String type, String version) {
        this.ensureStarted();
        this.controllerProcess.getCurrentController().removeProcessorType(type, version);
    }

    public synchronized List<BoundObjectCounts> getBoundObjectCounts() {
        ArrayList<BoundObjectCounts> list = new ArrayList<BoundObjectCounts>();
        for (Map.Entry<ExtensionId, List<PythonProcess>> entry : this.processesByProcessorType.entrySet()) {
            ExtensionId extensionId = entry.getKey();
            List<PythonProcess> processes = entry.getValue();
            for (PythonProcess process : processes) {
                Map<String, Integer> counts = process.getJavaObjectBindingCounts();
                StandardBoundObjectCounts boundObjectCounts = new StandardBoundObjectCounts(process.toString(), extensionId.type(), extensionId.version(), counts);
                list.add(boundObjectCounts);
            }
        }
        return list;
    }

    private void ensureStarted() {
        if (!this.running) {
            throw new IllegalStateException("Cannot perform action because " + String.valueOf(this) + " is not currently running");
        }
    }

    public synchronized void shutdown() {
        logger.info("Shutting down Python Server");
        this.running = false;
        for (List<PythonProcess> processes : this.processesByProcessorType.values()) {
            for (PythonProcess process : processes) {
                process.shutdown();
            }
        }
        if (this.controllerProcess != null) {
            this.controllerProcess.shutdown();
        }
        logger.info("Successfully shutdown Python Server");
    }

    public void ping() {
        this.controllerProcess.getCurrentController().ping();
    }

    public String toString() {
        return "StandardPythonBridge";
    }

    private Set<String> getNarDirectories() {
        return this.narDirectoryLookup.get().stream().map(File::getAbsolutePath).collect(Collectors.toSet());
    }

    private Optional<ExtensionId> findExtensionId(String type, String version) {
        List processorTypes = this.controllerProcess.getCurrentController().getProcessorTypes();
        return processorTypes.stream().filter(details -> details.getProcessorType().equals(type)).filter(details -> details.getProcessorVersion().equals(version)).map(details -> new ExtensionId(details.getProcessorType(), details.getProcessorVersion())).findFirst();
    }

    private record ExtensionId(String type, String version) {
    }
}

