/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.py4j;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.py4j.ProcessorCreationWorkflow;
import org.apache.nifi.py4j.PythonProcessLogReader;
import org.apache.nifi.py4j.StandardPythonProcessorBridge;
import org.apache.nifi.py4j.client.JavaObjectBindings;
import org.apache.nifi.py4j.client.NiFiPythonGateway;
import org.apache.nifi.py4j.client.StandardPythonClient;
import org.apache.nifi.py4j.logging.LogLevelChangeListener;
import org.apache.nifi.py4j.logging.PythonLogLevel;
import org.apache.nifi.py4j.logging.StandardLogLevelChangeHandler;
import org.apache.nifi.py4j.server.NiFiGatewayServer;
import org.apache.nifi.python.ControllerServiceTypeLookup;
import org.apache.nifi.python.PythonController;
import org.apache.nifi.python.PythonProcessConfig;
import org.apache.nifi.python.PythonProcessorDetails;
import org.apache.nifi.python.processor.PythonProcessorAdapter;
import org.apache.nifi.python.processor.PythonProcessorBridge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.CallbackClient;
import py4j.GatewayServer;

public class PythonProcess {
    private static final Logger logger = LoggerFactory.getLogger(PythonProcess.class);
    private static final String PYTHON_CONTROLLER_FILENAME = "Controller.py";
    private static final String LOG_READER_THREAD_NAME_FORMAT = "python-log-%d";
    private final PythonProcessConfig processConfig;
    private final ControllerServiceTypeLookup controllerServiceTypeLookup;
    private final File virtualEnvHome;
    private final boolean packagedWithDependencies;
    private final String componentType;
    private final String componentId;
    private GatewayServer server;
    private PythonController controller;
    private Process process;
    private NiFiPythonGateway gateway;
    private final Map<String, Boolean> processorPrefersIsolation = new ConcurrentHashMap<String, Boolean>();
    private final Set<CreatedProcessor> createdProcessors = new CopyOnWriteArraySet<CreatedProcessor>();
    private volatile boolean shutdown = false;
    private volatile List<String> extensionDirs;
    private volatile String workDir;
    private Thread logReaderThread;
    private String logListenerId;

    public PythonProcess(PythonProcessConfig processConfig, ControllerServiceTypeLookup controllerServiceTypeLookup, File virtualEnvHome, boolean packagedWithDependencies, String componentType, String componentId) {
        this.processConfig = processConfig;
        this.controllerServiceTypeLookup = controllerServiceTypeLookup;
        this.virtualEnvHome = virtualEnvHome;
        this.packagedWithDependencies = packagedWithDependencies;
        this.componentType = componentType;
        this.componentId = componentId;
    }

    PythonController getCurrentController() {
        return this.controller;
    }

    public synchronized void start() throws IOException {
        ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault();
        SocketFactory socketFactory = SocketFactory.getDefault();
        int timeoutMillis = (int)this.processConfig.getCommsTimeout().toMillis();
        String authToken = this.generateAuthToken();
        CallbackClient callbackClient = new CallbackClient(25334, GatewayServer.defaultAddress(), authToken, 50000L, TimeUnit.MILLISECONDS, socketFactory, false, timeoutMillis);
        JavaObjectBindings bindings = new JavaObjectBindings();
        this.gateway = new NiFiPythonGateway(bindings, null, callbackClient);
        this.gateway.startup();
        this.server = new NiFiGatewayServer(this.gateway, 0, GatewayServer.defaultAddress(), timeoutMillis, timeoutMillis, Collections.emptyList(), serverSocketFactory, authToken, this.componentType, this.componentId);
        this.server.start();
        int listeningPort = this.server.getListeningPort();
        this.setupEnvironment();
        this.process = this.launchPythonProcess(listeningPort, authToken);
        this.process.onExit().thenAccept(this::handlePythonProcessDied);
        String logReaderThreadName = LOG_READER_THREAD_NAME_FORMAT.formatted(this.process.pid());
        PythonProcessLogReader logReaderCommand = new PythonProcessLogReader(this.process.inputReader(StandardCharsets.UTF_8));
        this.logReaderThread = Thread.ofVirtual().name(logReaderThreadName).start(logReaderCommand);
        StandardPythonClient pythonClient = new StandardPythonClient(this.gateway);
        this.controller = pythonClient.getController();
        long timeout = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60L);
        Exception lastException = null;
        boolean pingSuccessful = false;
        while (System.currentTimeMillis() < timeout) {
            try {
                String pingResponse = this.controller.ping();
                pingSuccessful = "pong".equals(pingResponse);
                if (pingSuccessful) break;
                logger.debug("Got unexpected response from Py4J Server during ping: {}", (Object)pingResponse);
            }
            catch (Exception e) {
                lastException = e;
                logger.debug("Failed to start Py4J Server", (Throwable)e);
            }
            try {
                Thread.sleep(50L);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                return;
            }
        }
        if (!pingSuccessful && lastException != null) {
            throw new RuntimeException("Failed to start Python Bridge", lastException);
        }
        this.logListenerId = Long.toString(this.process.pid());
        StandardLogLevelChangeHandler.getHandler().addListener(this.logListenerId, new PythonProcessLogLevelChangeListener());
        this.controller.setControllerServiceTypeLookup(this.controllerServiceTypeLookup);
        logger.info("Successfully started and pinged Python Server. Python Process = {}", (Object)this.process);
    }

    private void handlePythonProcessDied(Process process) {
        if (this.isShutdown()) {
            logger.info("Python Process {} exited with code {}", (Object)process, (Object)process.exitValue());
            return;
        }
        List<String> processorsInvolved = this.createdProcessors.stream().map(coordinates -> "%s (%s)".formatted(coordinates.identifier(), coordinates.type())).toList();
        logger.error("Python Process {} with Processors {} died unexpectedly with exit code {}. Restarting...", new Object[]{process, processorsInvolved, process.exitValue()});
        long backoff = 1000L;
        while (!this.isShutdown()) {
            try {
                this.killProcess();
                this.start();
                if (this.extensionDirs != null && this.workDir != null) {
                    this.discoverExtensions(this.extensionDirs, this.workDir);
                    this.recreateProcessors();
                }
                return;
            }
            catch (Exception e) {
                logger.error("Failed to restart Python Process with Processors {}; will keep trying", processorsInvolved, (Object)e);
                try {
                    Thread.sleep(backoff);
                    backoff = Math.min(60000L, backoff * 2L);
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(ex);
                }
            }
        }
    }

    private String generateAuthToken() {
        SecureRandom random = new SecureRandom();
        byte[] bytes = new byte[20];
        random.nextBytes(bytes);
        return Base64.getEncoder().encodeToString(bytes);
    }

    private boolean isPackagedWithDependencies() {
        return this.packagedWithDependencies;
    }

    private Process launchPythonProcess(int listeningPort, String authToken) throws IOException {
        File pythonFrameworkDirectory = this.processConfig.getPythonFrameworkDirectory();
        File pythonApiDirectory = new File(pythonFrameworkDirectory.getParentFile(), "api");
        String pythonCommand = this.resolvePythonCommand();
        File controllerPyFile = new File(pythonFrameworkDirectory, PYTHON_CONTROLLER_FILENAME);
        ProcessBuilder processBuilder = new ProcessBuilder(new String[0]);
        ArrayList<String> commands = new ArrayList<String>();
        commands.add(pythonCommand);
        if (this.isPackagedWithDependencies()) {
            commands.add("-S");
        }
        StringBuilder pythonPath = new StringBuilder(pythonApiDirectory.getAbsolutePath());
        String absolutePath = this.virtualEnvHome.getAbsolutePath();
        pythonPath.append(File.pathSeparator).append(absolutePath);
        if (this.isPackagedWithDependencies()) {
            File dependenciesDir = new File(new File(absolutePath), "NAR-INF/bundled-dependencies");
            pythonPath.append(File.pathSeparator).append(dependenciesDir.getAbsolutePath());
        }
        if (this.processConfig.isDebugController() && "Controller".equals(this.componentId)) {
            commands.add("-m");
            commands.add("debugpy");
            commands.add("--listen");
            commands.add(this.processConfig.getDebugHost() + ":" + this.processConfig.getDebugPort());
            commands.add("--log-to-stderr");
            pythonPath.append(File.pathSeparator).append(this.virtualEnvHome.getAbsolutePath());
        }
        commands.add(controllerPyFile.getAbsolutePath());
        processBuilder.command(commands);
        processBuilder.environment().put("JAVA_PORT", String.valueOf(listeningPort));
        processBuilder.environment().put("ENV_HOME", this.virtualEnvHome.getAbsolutePath());
        processBuilder.environment().put("PYTHONPATH", pythonPath.toString());
        processBuilder.environment().put("PYTHON_CMD", pythonCommand);
        processBuilder.environment().put("AUTH_TOKEN", authToken);
        processBuilder.redirectErrorStream(true);
        logger.info("Launching Python Process {} {} with working directory {} to communicate with Java on Port {}", new Object[]{pythonCommand, controllerPyFile.getAbsolutePath(), this.virtualEnvHome, listeningPort});
        return processBuilder.start();
    }

    String resolvePythonCommand() throws IOException {
        if (this.isPackagedWithDependencies()) {
            return this.processConfig.getPythonCommand();
        }
        File pythonCmdFile = new File(this.processConfig.getPythonCommand());
        String pythonCmd = pythonCmdFile.getName();
        File[] virtualEnvDirectories = this.virtualEnvHome.listFiles((file, name) -> file.isDirectory() && (name.equals("bin") || name.equals("Scripts")));
        if (virtualEnvDirectories == null || virtualEnvDirectories.length == 0) {
            throw new IOException("Python binary directory could not be found in " + String.valueOf(this.virtualEnvHome));
        }
        String commandExecutableDirectory = virtualEnvDirectories.length == 1 ? virtualEnvDirectories[0].getName() : this.findExecutableDirectory(pythonCmd, virtualEnvDirectories);
        File pythonCommandFile = new File(this.virtualEnvHome, commandExecutableDirectory + File.separator + pythonCmd);
        return pythonCommandFile.getAbsolutePath();
    }

    String findExecutableDirectory(String pythonCmd, File[] virtualEnvDirectories) throws IOException {
        return List.of(virtualEnvDirectories).stream().filter(file -> ArrayUtils.isNotEmpty((Object[])file.list((dir, name) -> name.startsWith(pythonCmd)))).findFirst().orElseThrow(() -> new IOException("Failed to find Python command [%s]".formatted(pythonCmd))).getName();
    }

    private void setupEnvironment() throws IOException {
        int result;
        if (this.isPackagedWithDependencies()) {
            logger.debug("Will not create Python Virtual Environment because Python Processor packaged with dependencies");
            return;
        }
        File environmentCreationCompleteFile = new File(this.virtualEnvHome, "env-creation-complete.txt");
        if (environmentCreationCompleteFile.exists()) {
            logger.debug("Environment has already been created for {}; will not recreate", (Object)this.virtualEnvHome);
            return;
        }
        logger.info("Creating Python Virtual Environment {}", (Object)this.virtualEnvHome);
        Files.createDirectories(this.virtualEnvHome.toPath(), new FileAttribute[0]);
        String pythonCommand = this.processConfig.getPythonCommand();
        String environmentPath = this.virtualEnvHome.getAbsolutePath();
        ProcessBuilder processBuilder = new ProcessBuilder(pythonCommand, "-m", "venv", environmentPath);
        processBuilder.directory(this.virtualEnvHome.getParentFile());
        String command = String.join((CharSequence)" ", processBuilder.command());
        logger.debug("Creating Python Virtual Environment {} using command {}", (Object)this.virtualEnvHome, (Object)command);
        Process process = processBuilder.start();
        try {
            result = process.waitFor();
        }
        catch (InterruptedException e) {
            throw new IOException("Interrupted while waiting for Python virtual environment to be created");
        }
        if (result != 0) {
            throw new IOException("Failed to create Python Environment " + String.valueOf(this.virtualEnvHome) + ": process existed with code " + result);
        }
        if (this.processConfig.isDebugController() && "Controller".equals(this.componentId)) {
            this.installDebugPy();
        }
        environmentCreationCompleteFile.createNewFile();
        logger.info("Successfully created Python Virtual Environment {}", (Object)this.virtualEnvHome);
    }

    private void installDebugPy() throws IOException {
        int result;
        String pythonCommand = this.processConfig.getPythonCommand();
        ProcessBuilder processBuilder = new ProcessBuilder(pythonCommand, "-m", "pip", "install", "--no-cache-dir", "--upgrade", "debugpy", "--target", this.virtualEnvHome.getAbsolutePath());
        processBuilder.directory(this.virtualEnvHome);
        String command = String.join((CharSequence)" ", processBuilder.command());
        logger.debug("Installing DebugPy to Virtual Env {} using command {}", (Object)this.virtualEnvHome, (Object)command);
        Process process = processBuilder.start();
        try {
            result = process.waitFor();
        }
        catch (InterruptedException e) {
            throw new IOException("Interrupted while waiting for DebugPy to be installed");
        }
        if (result != 0) {
            throw new IOException("Failed to install DebugPy for Python Environment " + String.valueOf(this.virtualEnvHome) + ": process existed with code " + result);
        }
    }

    public boolean isShutdown() {
        return this.shutdown;
    }

    public void shutdown() {
        this.shutdown = true;
        logger.info("Shutting down Python Process {}", (Object)this.process);
        this.killProcess();
    }

    private synchronized void killProcess() {
        if (this.logListenerId != null) {
            StandardLogLevelChangeHandler.getHandler().removeListener(this.logListenerId);
        }
        if (this.server != null) {
            try {
                this.server.shutdown();
            }
            catch (Exception e) {
                logger.error("Failed to cleanly shutdown Py4J server", (Throwable)e);
            }
            this.server = null;
        }
        if (this.gateway != null) {
            try {
                this.gateway.shutdown(true);
            }
            catch (Exception e) {
                logger.error("Failed to cleanly shutdown Py4J Gateway", (Throwable)e);
            }
            this.gateway = null;
        }
        if (this.process != null) {
            try {
                this.process.destroyForcibly();
            }
            catch (Exception e) {
                logger.error("Failed to cleanly shutdown Py4J process", (Throwable)e);
            }
            this.process = null;
        }
        if (this.logReaderThread != null) {
            this.logReaderThread.interrupt();
        }
    }

    public void discoverExtensions(List<String> directories, String workDirectory) {
        this.extensionDirs = new ArrayList<String>(directories);
        this.workDir = workDirectory;
        this.controller.discoverExtensions(directories, workDirectory);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public PythonProcessorBridge createProcessor(String identifier, final String type, final String version, final String workDirPath, boolean prefersIsolation) {
        ProcessorCreationWorkflow creationWorkflow = new ProcessorCreationWorkflow(){

            @Override
            public boolean isPackagedWithDependencies() {
                return PythonProcess.this.packagedWithDependencies;
            }

            @Override
            public void downloadDependencies() {
                if (PythonProcess.this.packagedWithDependencies) {
                    return;
                }
                PythonProcess.this.controller.downloadDependencies(type, version, workDirPath);
            }

            @Override
            public PythonProcessorAdapter createProcessor() {
                return PythonProcess.this.controller.createProcessor(type, version, workDirPath);
            }
        };
        PythonProcessorDetails processorDetails = this.controller.getProcessorDetails(type, version);
        try {
            String processorType = processorDetails.getProcessorType();
            String processorVersion = processorDetails.getProcessorVersion();
            StandardPythonProcessorBridge processorBridge = new StandardPythonProcessorBridge.Builder().controller(this.controller).creationWorkflow(creationWorkflow).processorType(processorType).processorVersion(processorVersion).workingDirectory(this.processConfig.getPythonWorkingDirectory()).moduleFile(new File(this.controller.getModuleFile(type, version))).build();
            CreatedProcessor createdProcessor = new CreatedProcessor(identifier, type, processorBridge);
            this.createdProcessors.add(createdProcessor);
            this.processorPrefersIsolation.put(identifier, prefersIsolation);
            StandardPythonProcessorBridge standardPythonProcessorBridge = processorBridge;
            return standardPythonProcessorBridge;
        }
        finally {
            processorDetails.free();
        }
    }

    private void recreateProcessors() {
        for (CreatedProcessor createdProcessor : this.createdProcessors) {
            createdProcessor.processorBridge().replaceController(this.controller);
            logger.info("Recreated Processor {} ({}) in Python Process {}", new Object[]{createdProcessor.identifier(), createdProcessor.type(), this.process});
        }
    }

    public boolean containsIsolatedProcessor() {
        return this.processorPrefersIsolation.containsValue(Boolean.TRUE);
    }

    public boolean removeProcessor(String identifier) {
        CreatedProcessor matchingProcessor = this.createdProcessors.stream().filter(createdProcessor -> createdProcessor.identifier().equals(identifier)).findFirst().orElse(null);
        if (matchingProcessor == null) {
            return false;
        }
        this.createdProcessors.remove(matchingProcessor);
        this.processorPrefersIsolation.remove(identifier);
        return true;
    }

    public int getProcessorCount() {
        return this.processorPrefersIsolation.size();
    }

    public Map<String, Integer> getJavaObjectBindingCounts() {
        return this.gateway.getObjectBindings().getCountsPerClass();
    }

    private class PythonProcessLogLevelChangeListener
    implements LogLevelChangeListener {
        private PythonProcessLogLevelChangeListener() {
        }

        @Override
        public void onLevelChange(String loggerName, LogLevel logLevel) {
            PythonLogLevel pythonLogLevel = PythonLogLevel.valueOf(logLevel);
            PythonProcess.this.controller.setLoggerLevel(loggerName, pythonLogLevel.getLevel());
        }
    }

    private record CreatedProcessor(String identifier, String type, PythonProcessorBridge processorBridge) {
    }
}

