/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.python.processor;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.python.processor.PythonProcessor;
import org.apache.nifi.python.processor.PythonProcessorAdapter;
import org.apache.nifi.python.processor.PythonProcessorBridge;
import org.apache.nifi.python.processor.PythonProcessorInitializationContext;

@SupportsBatching(defaultDuration=DefaultRunDuration.TWENTY_FIVE_MILLIS)
@SupportsSensitiveDynamicProperties
@Stateful(scopes={Scope.LOCAL, Scope.CLUSTER}, description="Python processors can store and retrieve state using the State Management APIs. Consult the State Manager section of the Developer's Guide for more details.")
public abstract class PythonProcessorProxy<T extends PythonProcessor>
extends AbstractProcessor
implements AsyncLoadedProcessor {
    private final String processorType;
    private volatile PythonProcessorInitializationContext initContext;
    private volatile PythonProcessorBridge bridge;
    private volatile Set<Relationship> cachedRelationships = null;
    private volatile List<PropertyDescriptor> cachedPropertyDescriptors = null;
    private volatile Map<String, PropertyDescriptor> cachedDynamicDescriptors = null;
    private volatile Boolean supportsDynamicProperties;
    private volatile T currentTransform;
    private volatile PythonProcessorAdapter currentAdapter;
    private volatile ProcessContext currentProcessContext;
    protected static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile will be routed to this relationship when it has been successfully transformed").autoTerminateDefault(true).build();
    protected static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("The original FlowFile will be routed to this relationship if it unable to be transformed for some reason").build();
    private static final Set<Relationship> implicitRelationships = Set.of(REL_ORIGINAL, REL_FAILURE);

    public PythonProcessorProxy(String processorType, Supplier<PythonProcessorBridge> bridgeFactory, boolean initialize) {
        this.processorType = processorType;
        Thread.ofVirtual().name("Initialize " + processorType).start(() -> {
            PythonProcessorInitializationContext pythonInitContext;
            this.bridge = (PythonProcessorBridge)bridgeFactory.get();
            if (initialize && (pythonInitContext = this.initContext) != null) {
                this.bridge.initialize(pythonInitContext);
            }
        });
    }

    protected void init(final ProcessorInitializationContext context) {
        PythonProcessorInitializationContext initContext;
        super.init(context);
        this.initContext = initContext = new PythonProcessorInitializationContext(){

            public String getIdentifier() {
                return context.getIdentifier();
            }

            public ComponentLog getLogger() {
                return context.getLogger();
            }
        };
        PythonProcessorBridge bridge = this.bridge;
        if (bridge != null) {
            bridge.initialize(initContext);
        }
    }

    protected synchronized T getTransform() {
        PythonProcessorBridge bridge = this.getBridge().orElseThrow(() -> new IllegalStateException(String.valueOf((Object)this) + " is not finished initializing"));
        Optional optionalAdapter = bridge.getProcessorAdapter();
        if (optionalAdapter.isEmpty()) {
            throw new IllegalStateException(String.valueOf((Object)this) + " is not finished initializing");
        }
        PythonProcessorAdapter adapter = (PythonProcessorAdapter)optionalAdapter.get();
        if (adapter != this.currentAdapter) {
            PythonProcessor transform = adapter.getProcessor();
            transform.setContext(this.currentProcessContext);
            this.currentTransform = transform;
            this.currentAdapter = adapter;
        }
        return this.currentTransform;
    }

    protected Optional<PythonProcessorBridge> getBridge() {
        return Optional.ofNullable(this.bridge);
    }

    public AsyncLoadedProcessor.LoadState getState() {
        if (this.bridge == null) {
            return AsyncLoadedProcessor.LoadState.INITIALIZING_ENVIRONMENT;
        }
        return this.bridge.getLoadState();
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        if (this.cachedPropertyDescriptors != null) {
            return this.cachedPropertyDescriptors;
        }
        if (this.getState() != AsyncLoadedProcessor.LoadState.FINISHED_LOADING) {
            return Collections.emptyList();
        }
        Optional optionalAdapter = this.bridge.getProcessorAdapter();
        if (optionalAdapter.isEmpty()) {
            return this.cachedPropertyDescriptors == null ? Collections.emptyList() : this.cachedPropertyDescriptors;
        }
        try {
            List properties;
            this.cachedPropertyDescriptors = properties = ((PythonProcessorAdapter)optionalAdapter.get()).getSupportedPropertyDescriptors();
            return properties;
        }
        catch (Exception e) {
            this.getLogger().warn("Failed to obtain list of Property Descriptors from Python processor {}; returning cached list", new Object[]{this, e});
            List<PropertyDescriptor> properties = this.cachedPropertyDescriptors;
            return properties == null ? Collections.emptyList() : properties;
        }
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        if (this.bridge == null) {
            return List.of(new ValidationResult.Builder().subject("Processor").explanation("Python environment is not yet initialized").valid(false).build());
        }
        AsyncLoadedProcessor.LoadState loadState = this.bridge.getLoadState();
        if (loadState == AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE || loadState == AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES) {
            return List.of(new ValidationResult.Builder().subject("Processor").explanation("Processor has not yet completed initialization").valid(false).build());
        }
        try {
            this.reload();
            Optional optionalAdapter = this.bridge.getProcessorAdapter();
            if (optionalAdapter.isEmpty()) {
                return List.of(new ValidationResult.Builder().subject("Processor").explanation("Processor has not yet completed initialization").valid(false).build());
            }
            return ((PythonProcessorAdapter)optionalAdapter.get()).customValidate(validationContext);
        }
        catch (Exception e) {
            this.getLogger().warn("Failed to perform validation for Python Processor {}; assuming invalid", new Object[]{this, e});
            return Collections.singleton(new ValidationResult.Builder().subject("Perform Validation").valid(false).explanation("Failed to trigger Python Processor to perform validation: " + String.valueOf(e)).build());
        }
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        if (!this.isSupportsDynamicPropertyDescriptor()) {
            return null;
        }
        if (this.cachedDynamicDescriptors != null) {
            return this.cachedDynamicDescriptors.get(propertyDescriptorName);
        }
        if (this.getState() != AsyncLoadedProcessor.LoadState.FINISHED_LOADING) {
            return null;
        }
        try {
            Optional optionalAdapter = this.bridge.getProcessorAdapter();
            return optionalAdapter.map(adapter -> adapter.getSupportedDynamicPropertyDescriptor(propertyDescriptorName)).orElse(null);
        }
        catch (Exception e) {
            this.getLogger().warn("Failed to obtain Dynamic Property Descriptor with name {} from Python Processor {}; assuming property is not valid", new Object[]{propertyDescriptorName, this, e});
            return null;
        }
    }

    protected boolean isSupportsDynamicPropertyDescriptor() {
        if (this.supportsDynamicProperties != null) {
            return this.supportsDynamicProperties;
        }
        if (this.getState() != AsyncLoadedProcessor.LoadState.FINISHED_LOADING) {
            return false;
        }
        Optional adapter = this.bridge.getProcessorAdapter();
        boolean supported = adapter.map(PythonProcessorAdapter::isDynamicPropertySupported).orElse(false);
        this.supportsDynamicProperties = supported;
        return supported;
    }

    private void cacheRelationships() {
        Set<Relationship> relationships = this.fetchRelationshipsFromPythonProcessor();
        this.cachedRelationships = Set.copyOf(relationships);
    }

    private void cacheDynamicPropertyDescriptors(ProcessContext context) {
        HashMap<String, PropertyDescriptor> dynamicDescriptors = new HashMap<String, PropertyDescriptor>();
        Set descriptors = context.getProperties().keySet();
        for (PropertyDescriptor descriptor : descriptors) {
            if (!descriptor.isDynamic()) continue;
            dynamicDescriptors.put(descriptor.getName(), descriptor);
        }
        this.cachedDynamicDescriptors = dynamicDescriptors;
    }

    public Set<Relationship> getRelationships() {
        Set<Relationship> cached = this.cachedRelationships;
        if (cached != null) {
            return cached;
        }
        return this.fetchRelationshipsFromPythonProcessor();
    }

    private Set<Relationship> fetchRelationshipsFromPythonProcessor() {
        HashSet<Relationship> processorRelationships;
        if (this.getState() != AsyncLoadedProcessor.LoadState.FINISHED_LOADING) {
            return Collections.emptySet();
        }
        try {
            processorRelationships = this.bridge.getProcessorAdapter().map(PythonProcessorAdapter::getRelationships).orElseGet(HashSet::new);
        }
        catch (Exception e) {
            this.getLogger().warn("Failed to obtain list of Relationships from Python Processor {}; assuming no explicit relationships", new Object[]{this, e});
            processorRelationships = new HashSet<Relationship>();
        }
        processorRelationships.addAll(this.getImplicitRelationships());
        return processorRelationships;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        this.currentProcessContext = context;
        if (this.bridge == null) {
            throw new IllegalStateException("Processor is not yet initialized");
        }
        this.reload();
        PythonProcessorAdapter adapter = (PythonProcessorAdapter)this.bridge.getProcessorAdapter().orElseThrow(() -> new IllegalStateException("Processor has not finished initializing"));
        adapter.onScheduled(context);
        adapter.getProcessor().setContext(context);
        this.cacheRelationships();
        this.cacheDynamicPropertyDescriptors(context);
    }

    @OnStopped
    public void onStopped(ProcessContext context) {
        if (this.bridge == null) {
            throw new IllegalStateException("Processor is not yet initialized");
        }
        ((PythonProcessorAdapter)this.bridge.getProcessorAdapter().orElseThrow(() -> new IllegalStateException("Processor has not finished initializing"))).onStopped(context);
    }

    public String toString() {
        return "PythonProcessor[type=" + this.processorType + ", id=" + this.getIdentifier() + "]";
    }

    private void reload() {
        if (this.bridge == null) {
            return;
        }
        boolean reloaded = this.bridge.reload();
        if (reloaded) {
            this.getLogger().info("Successfully reloaded Processor");
        }
        this.cachedPropertyDescriptors = null;
        this.cachedRelationships = null;
        this.supportsDynamicProperties = ((PythonProcessorAdapter)this.bridge.getProcessorAdapter().orElseThrow(() -> new IllegalStateException("Processor has not finished initializing"))).isDynamicPropertySupported();
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        this.cachedPropertyDescriptors = null;
        this.cachedRelationships = null;
        super.onPropertyModified(descriptor, oldValue, newValue);
    }

    protected Set<Relationship> getImplicitRelationships() {
        return implicitRelationships;
    }
}

