/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.py4j;

import java.io.File;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.py4j.ProcessorCreationWorkflow;
import org.apache.nifi.python.PythonController;
import org.apache.nifi.python.processor.PythonProcessorAdapter;
import org.apache.nifi.python.processor.PythonProcessorBridge;
import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardPythonProcessorBridge
implements PythonProcessorBridge {
    private static final Logger logger = LoggerFactory.getLogger(StandardPythonProcessorBridge.class);
    private final ProcessorCreationWorkflow creationWorkflow;
    private final String processorType;
    private final String processorVersion;
    private volatile PythonProcessorAdapter adapter;
    private final File workingDir;
    private final File moduleFile;
    private volatile long lastModified;
    private volatile AsyncLoadedProcessor.LoadState loadState = AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES;
    private volatile PythonProcessorInitializationContext initializationContext;
    private volatile String identifier;
    private volatile PythonController controller;
    private volatile CompletableFuture<Void> initializationFuture;

    private StandardPythonProcessorBridge(Builder builder) {
        this.controller = builder.controller;
        this.creationWorkflow = builder.creationWorkflow;
        this.processorType = builder.processorType;
        this.processorVersion = builder.processorVersion;
        this.workingDir = builder.workDir;
        this.moduleFile = builder.moduleFile;
        this.lastModified = this.moduleFile.lastModified();
    }

    public Optional<PythonProcessorAdapter> getProcessorAdapter() {
        return Optional.ofNullable(this.adapter);
    }

    public void initialize(PythonProcessorInitializationContext context) {
        if (this.initializationFuture != null) {
            this.initializationFuture.cancel(true);
        }
        this.initializationContext = context;
        this.identifier = context.getIdentifier();
        CompletableFuture future = new CompletableFuture();
        this.initializationFuture = future;
        String threadName = "Initialize Python Processor %s (%s)".formatted(this.identifier, this.getProcessorType());
        Thread.ofVirtual().name(threadName).start(() -> this.initializePythonSide(true, future));
    }

    public void replaceController(PythonController controller) {
        if (this.initializationFuture != null) {
            this.initializationFuture.cancel(true);
        }
        this.controller = controller;
        this.adapter = null;
        CompletableFuture future = new CompletableFuture();
        this.initializationFuture = future;
        String threadName = "Re-Initialize Python Processor %s (%s)".formatted(this.identifier, this.getProcessorType());
        Thread.ofVirtual().name(threadName).start(() -> this.initializePythonSide(true, future));
    }

    public AsyncLoadedProcessor.LoadState getLoadState() {
        return this.loadState;
    }

    private void initializePythonSide(boolean continualRetry, CompletableFuture<Void> future) {
        if (this.initializationContext == null) {
            future.complete(null);
            return;
        }
        long sleepMillis = 1000L;
        while (!future.isCancelled()) {
            boolean packagedWithDependencies = this.creationWorkflow.isPackagedWithDependencies();
            if (packagedWithDependencies) {
                this.loadState = AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE;
                break;
            }
            this.loadState = AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES;
            try {
                this.creationWorkflow.downloadDependencies();
                logger.info("Successfully downloaded dependencies for Python Processor {} ({})", (Object)this.identifier, (Object)this.getProcessorType());
                break;
            }
            catch (Exception e) {
                this.loadState = AsyncLoadedProcessor.LoadState.DEPENDENCY_DOWNLOAD_FAILED;
                if (!continualRetry) {
                    throw e;
                }
                sleepMillis = Math.min(sleepMillis * 2L, TimeUnit.MINUTES.toMillis(10L));
                logger.error("Failed to download dependencies for Python Processor {} ({}). Will try again in {} millis", new Object[]{this.identifier, this.getProcessorType(), sleepMillis, e});
                try {
                    Thread.sleep(sleepMillis);
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    e.addSuppressed(ex);
                    throw e;
                }
            }
        }
        while (!future.isCancelled()) {
            this.loadState = AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE;
            try {
                PythonProcessorAdapter pythonProcessorAdapter = this.creationWorkflow.createProcessor();
                pythonProcessorAdapter.initialize(this.initializationContext);
                this.adapter = pythonProcessorAdapter;
                this.loadState = AsyncLoadedProcessor.LoadState.FINISHED_LOADING;
                logger.info("Successfully loaded Python Processor {} ({})", (Object)this.identifier, (Object)this.getProcessorType());
                break;
            }
            catch (Exception e) {
                this.loadState = AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE_FAILED;
                if (!continualRetry) {
                    throw e;
                }
                sleepMillis = Math.min(sleepMillis * 2L, TimeUnit.MINUTES.toMillis(10L));
                logger.error("Failed to load code for Python Processor {} ({}). Will try again in {} millis", new Object[]{this.identifier, this.getProcessorType(), sleepMillis, e});
                try {
                    Thread.sleep(sleepMillis);
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    e.addSuppressed(ex);
                    throw e;
                }
            }
        }
        future.complete(null);
    }

    public String getProcessorType() {
        return this.processorType;
    }

    public boolean reload() {
        if (this.moduleFile.lastModified() <= this.lastModified) {
            logger.debug("Processor {} has not been modified since it was last loaded so will not reload", (Object)this.getProcessorType());
            return false;
        }
        this.controller.reloadProcessor(this.getProcessorType(), this.processorVersion, this.workingDir.getAbsolutePath());
        this.initializePythonSide(false, new CompletableFuture<Void>());
        this.lastModified = this.moduleFile.lastModified();
        return true;
    }

    public static class Builder {
        private PythonController controller;
        private ProcessorCreationWorkflow creationWorkflow;
        private File workDir;
        private File moduleFile;
        private String processorType;
        private String processorVersion;

        public Builder controller(PythonController controller) {
            this.controller = controller;
            return this;
        }

        public Builder creationWorkflow(ProcessorCreationWorkflow creationWorkflow) {
            this.creationWorkflow = creationWorkflow;
            return this;
        }

        public Builder processorType(String processorType) {
            this.processorType = processorType;
            return this;
        }

        public Builder processorVersion(String processorVersion) {
            this.processorVersion = processorVersion;
            return this;
        }

        public Builder workingDirectory(File workDir) {
            this.workDir = workDir;
            return this;
        }

        public Builder moduleFile(File moduleFile) {
            this.moduleFile = moduleFile;
            return this;
        }

        public StandardPythonProcessorBridge build() {
            if (this.controller == null) {
                throw new IllegalStateException("Must specify the PythonController");
            }
            if (this.creationWorkflow == null) {
                throw new IllegalStateException("Must specify the Processor Creation Workflow");
            }
            if (this.processorType == null) {
                throw new IllegalStateException("Must specify the Processor Type");
            }
            if (this.processorVersion == null) {
                throw new IllegalStateException("Must specify the Processor Version");
            }
            if (this.workDir == null) {
                throw new IllegalStateException("Must specify the Working Directory");
            }
            if (this.moduleFile == null) {
                throw new IllegalStateException("Must specify the Module File");
            }
            return new StandardPythonProcessorBridge(this);
        }
    }
}

