/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.amqp.processors;

import com.rabbitmq.client.Address;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultSaslConfig;
import com.rabbitmq.client.ExceptionHandler;
import com.rabbitmq.client.SaslConfig;
import com.rabbitmq.client.impl.DefaultExceptionHandler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.nifi.amqp.processors.AMQPException;
import org.apache.nifi.amqp.processors.AMQPResource;
import org.apache.nifi.amqp.processors.AMQPRollbackException;
import org.apache.nifi.amqp.processors.AMQPWorker;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextProvider;

abstract class AbstractAMQPProcessor<T extends AMQPWorker>
extends AbstractProcessor {
    public static final String AMQP_APPID_ATTRIBUTE = "amqp$appId";
    public static final String AMQP_CONTENT_ENCODING_ATTRIBUTE = "amqp$contentEncoding";
    public static final String AMQP_CONTENT_TYPE_ATTRIBUTE = "amqp$contentType";
    public static final String AMQP_HEADERS_ATTRIBUTE = "amqp$headers";
    public static final String AMQP_DELIVERY_MODE_ATTRIBUTE = "amqp$deliveryMode";
    public static final String AMQP_PRIORITY_ATTRIBUTE = "amqp$priority";
    public static final String AMQP_CORRELATION_ID_ATTRIBUTE = "amqp$correlationId";
    public static final String AMQP_REPLY_TO_ATTRIBUTE = "amqp$replyTo";
    public static final String AMQP_EXPIRATION_ATTRIBUTE = "amqp$expiration";
    public static final String AMQP_MESSAGE_ID_ATTRIBUTE = "amqp$messageId";
    public static final String AMQP_TIMESTAMP_ATTRIBUTE = "amqp$timestamp";
    public static final String AMQP_TYPE_ATTRIBUTE = "amqp$type";
    public static final String AMQP_USER_ID_ATTRIBUTE = "amqp$userId";
    public static final String AMQP_CLUSTER_ID_ATTRIBUTE = "amqp$clusterId";
    public static final PropertyDescriptor BROKERS = new PropertyDescriptor.Builder().name("Brokers").description("A comma-separated list of known AMQP Brokers in the format <host>:<port> (e.g., localhost:5672). If this is set, Host Name and Port are ignored. Only include hosts from the same AMQP cluster.").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR).build();
    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder().name("Host Name").description("Network address of AMQP broker (e.g., localhost). If Brokers is set, then this property is ignored.").required(false).defaultValue("localhost").expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("Port").description("Numeric value identifying Port of AMQP broker (e.g., 5671). If Brokers is set, then this property is ignored.").required(false).defaultValue("5672").expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.PORT_VALIDATOR).build();
    public static final PropertyDescriptor V_HOST = new PropertyDescriptor.Builder().name("Virtual Host").description("Virtual Host name which segregates AMQP system for enhanced security.").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor USER = new PropertyDescriptor.Builder().name("Username").description("Username used for authentication and authorization.").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder().name("Password").description("Password used for authentication and authorization.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
    public static final PropertyDescriptor AMQP_VERSION = new PropertyDescriptor.Builder().name("AMQP Version").description("AMQP Version. Currently only supports AMQP v0.9.1.").required(true).allowableValues(new String[]{"0.9.1"}).defaultValue("0.9.1").build();
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL Context Service").description("The SSL Context Service used to provide client certificate information for TLS/SSL connections.").required(false).identifiesControllerService(SSLContextProvider.class).build();
    public static final PropertyDescriptor CLIENT_CERTIFICATE_AUTHENTICATION_ENABLED = new PropertyDescriptor.Builder().name("Client Certificate Authentication Enabled").description("Authenticate using the SSL certificate rather than user name/password.").required(false).defaultValue("false").allowableValues(new String[]{"true", "false"}).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    protected static final PropertyDescriptor MAX_INBOUND_MESSAGE_BODY_SIZE = new PropertyDescriptor.Builder().name("Max Inbound Message Body Size").description("Maximum body size of inbound (received) messages.").required(true).defaultValue("64 MB").expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.createDataSizeBoundsValidator((long)1L, (long)Integer.MAX_VALUE)).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(BROKERS, HOST, PORT, V_HOST, USER, PASSWORD, AMQP_VERSION, SSL_CONTEXT_SERVICE, CLIENT_CERTIFICATE_AUTHENTICATION_ENABLED);
    private BlockingQueue<AMQPResource<T>> resourceQueue;

    AbstractAMQPProcessor() {
    }

    protected static List<PropertyDescriptor> getCommonPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public void migrateProperties(PropertyConfiguration propertyConfiguration) {
        propertyConfiguration.removeProperty("ssl-client-auth");
        propertyConfiguration.renameProperty("User Name", USER.getName());
        propertyConfiguration.renameProperty("ssl-context-service", SSL_CONTEXT_SERVICE.getName());
        propertyConfiguration.renameProperty("cert-authentication", CLIENT_CERTIFICATE_AUTHENTICATION_ENABLED.getName());
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        this.resourceQueue = new LinkedBlockingQueue<AMQPResource<T>>(context.getMaxConcurrentTasks());
    }

    protected Collection<ValidationResult> customValidate(ValidationContext context) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>(super.customValidate(context));
        boolean userConfigured = context.getProperty(USER).isSet();
        boolean passwordConfigured = context.getProperty(PASSWORD).isSet();
        boolean sslServiceConfigured = context.getProperty(SSL_CONTEXT_SERVICE).isSet();
        boolean useCertAuthentication = context.getProperty(CLIENT_CERTIFICATE_AUTHENTICATION_ENABLED).asBoolean();
        if (useCertAuthentication && (userConfigured || passwordConfigured)) {
            results.add(new ValidationResult.Builder().subject("Authentication configuration").valid(false).explanation(String.format("'%s' with '%s' and '%s' cannot be configured at the same time", USER.getDisplayName(), PASSWORD.getDisplayName(), CLIENT_CERTIFICATE_AUTHENTICATION_ENABLED.getDisplayName())).build());
        }
        if (!(useCertAuthentication || userConfigured && passwordConfigured)) {
            results.add(new ValidationResult.Builder().subject("Authentication configuration").valid(false).explanation(String.format("either '%s' with '%s' or '%s' must be configured", USER.getDisplayName(), PASSWORD.getDisplayName(), CLIENT_CERTIFICATE_AUTHENTICATION_ENABLED.getDisplayName())).build());
        }
        if (useCertAuthentication && !sslServiceConfigured) {
            results.add(new ValidationResult.Builder().subject("SSL configuration").valid(false).explanation(String.format("'%s' has been set but no '%s' configured", CLIENT_CERTIFICATE_AUTHENTICATION_ENABLED.getDisplayName(), SSL_CONTEXT_SERVICE.getDisplayName())).build());
        }
        return results;
    }

    public final void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        AMQPResource<T> resource = (AMQPResource<T>)this.resourceQueue.poll();
        if (resource == null) {
            try {
                resource = this.createResource(context);
            }
            catch (Exception e) {
                this.getLogger().error("Failed to initialize AMQP client", (Throwable)e);
                context.yield();
                return;
            }
        }
        try {
            this.processResource(resource.getConnection(), resource.getWorker(), context, session);
            if (!this.resourceQueue.offer(resource)) {
                this.getLogger().info("Worker queue is full, closing AMQP client");
                this.closeResource(resource);
            }
        }
        catch (AMQPException | AMQPRollbackException e) {
            this.getLogger().error("AMQP failure, dropping the client", (Throwable)e);
            context.yield();
            this.closeResource(resource);
        }
        catch (Exception e) {
            this.getLogger().error("Processor failure", (Throwable)e);
            context.yield();
        }
    }

    @OnStopped
    public void close() {
        if (this.resourceQueue != null) {
            AMQPResource resource;
            while ((resource = (AMQPResource)this.resourceQueue.poll()) != null) {
                this.closeResource(resource);
            }
            this.resourceQueue = null;
        }
    }

    private void closeResource(AMQPResource<T> resource) {
        try {
            resource.close();
        }
        catch (Exception e) {
            this.getLogger().error("Failed to close AMQP Connection", (Throwable)e);
        }
    }

    protected abstract void processResource(Connection var1, T var2, ProcessContext var3, ProcessSession var4) throws ProcessException;

    protected abstract T createAMQPWorker(ProcessContext var1, Connection var2);

    private AMQPResource<T> createResource(ProcessContext context) {
        Connection connection = null;
        try {
            ExecutorService executor = Executors.newSingleThreadExecutor((ThreadFactory)BasicThreadFactory.builder().namingPattern("AMQP Consumer: " + this.getIdentifier()).build());
            connection = this.createConnection(context, executor);
            T worker = this.createAMQPWorker(context, connection);
            return new AMQPResource<T>(connection, worker, executor);
        }
        catch (Exception e) {
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                }
                catch (Exception closingEx) {
                    this.getLogger().error("Failed to close AMQP Connection", (Throwable)closingEx);
                }
            }
            throw e;
        }
    }

    private Address[] createHostsList(ProcessContext context) {
        String evaluatedUrls = context.getProperty(BROKERS).evaluateAttributeExpressions().getValue();
        return Address.parseAddresses((String)evaluatedUrls);
    }

    protected Connection createConnection(ProcessContext context, ExecutorService executor) {
        String vHost;
        ConnectionFactory cf = new ConnectionFactory();
        cf.setUsername(context.getProperty(USER).evaluateAttributeExpressions().getValue());
        cf.setPassword(context.getProperty(PASSWORD).getValue());
        if (context.getProperty(MAX_INBOUND_MESSAGE_BODY_SIZE).isSet()) {
            cf.setMaxInboundMessageBodySize(context.getProperty(MAX_INBOUND_MESSAGE_BODY_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).intValue());
        }
        if ((vHost = context.getProperty(V_HOST).evaluateAttributeExpressions().getValue()) != null) {
            cf.setVirtualHost(vHost);
        }
        SSLContextProvider sslContextProvider = (SSLContextProvider)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
        Boolean useCertAuthentication = context.getProperty(CLIENT_CERTIFICATE_AUTHENTICATION_ENABLED).asBoolean();
        if (sslContextProvider != null) {
            SSLContext sslContext = sslContextProvider.createContext();
            cf.useSslProtocol(sslContext);
            if (useCertAuthentication.booleanValue()) {
                cf.setSaslConfig((SaslConfig)DefaultSaslConfig.EXTERNAL);
            }
        }
        cf.setAutomaticRecoveryEnabled(false);
        cf.setExceptionHandler((ExceptionHandler)new DefaultExceptionHandler(){

            public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) {
                AbstractAMQPProcessor.this.getLogger().error("Connection lost to server {}:{}.", new Object[]{conn.getAddress(), conn.getPort(), exception});
            }
        });
        try {
            Connection connection;
            if (context.getProperty(BROKERS).isSet()) {
                Address[] hostsList = this.createHostsList(context);
                connection = cf.newConnection(executor, hostsList);
            } else {
                cf.setHost(context.getProperty(HOST).evaluateAttributeExpressions().getValue());
                cf.setPort(Integer.parseInt(context.getProperty(PORT).evaluateAttributeExpressions().getValue()));
                connection = cf.newConnection(executor);
            }
            return connection;
        }
        catch (Exception e) {
            throw new IllegalStateException(String.format("Failed to establish connection with AMQP Broker: %s:%s", cf.getHost(), cf.getPort()), e);
        }
    }
}

