/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.websocket.jetty;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextProvider;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.websocket.WebSocketClientService;
import org.apache.nifi.websocket.WebSocketConfigurationException;
import org.apache.nifi.websocket.WebSocketMessageRouter;
import org.apache.nifi.websocket.jetty.AbstractJettyWebSocketService;
import org.apache.nifi.websocket.jetty.RoutingWebSocketListener;
import org.apache.nifi.websocket.jetty.dto.SessionInfo;
import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.ProxyConfiguration;
import org.eclipse.jetty.client.transport.HttpClientTransportDynamic;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;

@Tags(value={"WebSocket", "Jetty", "client"})
@CapabilityDescription(value="Implementation of WebSocketClientService. This service uses Jetty WebSocket client module to provide WebSocket session management throughout the application.")
public class JettyWebSocketClient
extends AbstractJettyWebSocketService
implements WebSocketClientService {
    public static final PropertyDescriptor WS_URI = new PropertyDescriptor.Builder().name("websocket-uri").displayName("WebSocket URI").description("The WebSocket URI this client connects to.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.URI_VALIDATOR).addValidator((subject, input, context) -> {
        ValidationResult.Builder result = new ValidationResult.Builder().valid(input.startsWith("/")).subject(subject);
        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
            result.explanation("Expression Language Present").valid(true);
        } else {
            result.explanation("Protocol should be either 'ws' or 'wss'.").valid(input.startsWith("ws://") || input.startsWith("wss://"));
        }
        return result.build();
    }).build();
    public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder().name("connection-timeout").displayName("Connection Timeout").description("The timeout to connect the WebSocket URI.").required(true).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("3 sec").build();
    public static final PropertyDescriptor CONNECTION_ATTEMPT_COUNT = new PropertyDescriptor.Builder().name("connection-attempt-timeout").displayName("Connection Attempt Count").description("The number of times to try and establish a connection.").required(true).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("3").build();
    public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder().name("session-maintenance-interval").displayName("Session Maintenance Interval").description("The interval between session maintenance activities. A WebSocket session established with a WebSocket server can be terminated due to different reasons including restarting the WebSocket server or timing out inactive sessions. This session maintenance activity is periodically executed in order to reconnect those lost sessions, so that a WebSocket client can reuse the same session id transparently after it reconnects successfully.  The maintenance activity is executed until corresponding processors or this controller service is stopped.").required(true).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("10 sec").build();
    public static final PropertyDescriptor USER_NAME = new PropertyDescriptor.Builder().name("user-name").displayName("User Name").description("The user name for Basic Authentication.").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor USER_PASSWORD = new PropertyDescriptor.Builder().name("user-password").displayName("User Password").description("The user password for Basic Authentication.").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).sensitive(true).build();
    public static final PropertyDescriptor AUTH_CHARSET = new PropertyDescriptor.Builder().name("authentication-charset").displayName("Authentication Header Charset").description("The charset for Basic Authentication header base64 string.").required(true).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).defaultValue("US-ASCII").build();
    public static final PropertyDescriptor CUSTOM_AUTH = new PropertyDescriptor.Builder().name("custom-authorization").displayName("Custom Authorization").description("Configures a custom HTTP Authorization Header as described in RFC 7235 Section 4.2. Setting a custom Authorization Header excludes configuring the User Name and User Password properties for Basic Authentication.").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
    public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder().name("proxy-host").displayName("HTTP Proxy Host").description("The host name of the HTTP Proxy.").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder().name("proxy-port").displayName("HTTP Proxy Port").description("The port number of the HTTP Proxy.").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.PORT_VALIDATOR).build();
    private static final int INITIAL_BACKOFF_MILLIS = 100;
    private static final int MAXIMUM_BACKOFF_MILLIS = 3200;
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Stream.concat(JettyWebSocketClient.getAbstractPropertyDescriptors().stream(), Stream.of(WS_URI, SSL_CONTEXT, CONNECTION_TIMEOUT, CONNECTION_ATTEMPT_COUNT, SESSION_MAINTENANCE_INTERVAL, USER_NAME, USER_PASSWORD, AUTH_CHARSET, CUSTOM_AUTH, PROXY_HOST, PROXY_PORT)).toList();
    private final Map<String, SessionInfo> activeSessions = new ConcurrentHashMap<String, SessionInfo>();
    private final ReentrantLock connectionLock = new ReentrantLock();
    private WebSocketClient client;
    private URI webSocketUri;
    private long connectionTimeoutMillis;
    private int connectCount;
    private volatile ScheduledExecutorService sessionMaintenanceScheduler;
    private ConfigurationContext configurationContext;
    protected String authorizationHeader;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @OnEnabled
    public void startClient(ConfigurationContext context) throws Exception {
        HttpClient httpClient;
        this.configurationContext = context;
        this.connectCount = this.configurationContext.getProperty(CONNECTION_ATTEMPT_COUNT).evaluateAttributeExpressions().asInteger();
        SSLContextProvider sslContextProvider = (SSLContextProvider)context.getProperty(SSL_CONTEXT).asControllerService(SSLContextProvider.class);
        if (sslContextProvider == null) {
            httpClient = new HttpClient();
        } else {
            SslContextFactory.Client sslContextFactory = new SslContextFactory.Client();
            SSLContext sslContext = sslContextProvider.createContext();
            sslContextFactory.setSslContext(sslContext);
            ClientConnector clientConnector = new ClientConnector();
            clientConnector.setSslContextFactory(sslContextFactory);
            httpClient = new HttpClient((HttpClientTransport)new HttpClientTransportDynamic(clientConnector, new ClientConnectionFactory.Info[0]));
        }
        String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
        Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
        if (proxyHost != null && proxyPort != null) {
            HttpProxy httpProxy = new HttpProxy(proxyHost, proxyPort.intValue());
            httpClient.getProxyConfiguration().addProxy((ProxyConfiguration.Proxy)httpProxy);
        }
        this.client = new WebSocketClient(httpClient);
        this.configurePolicy(context, this.client);
        String userName = context.getProperty(USER_NAME).evaluateAttributeExpressions().getValue();
        String userPassword = context.getProperty(USER_PASSWORD).evaluateAttributeExpressions().getValue();
        String customAuth = context.getProperty(CUSTOM_AUTH).evaluateAttributeExpressions().getValue();
        if (!StringUtils.isEmpty((String)customAuth)) {
            this.authorizationHeader = customAuth;
        } else if (!StringUtils.isEmpty((String)userName) && !StringUtils.isEmpty((String)userPassword)) {
            String charsetName = context.getProperty(AUTH_CHARSET).evaluateAttributeExpressions().getValue();
            if (StringUtils.isEmpty((String)charsetName)) {
                throw new IllegalArgumentException(AUTH_CHARSET.getDisplayName() + " was not specified.");
            }
            Charset charset = Charset.forName(charsetName);
            String base64String = Base64.getEncoder().encodeToString((userName + ":" + userPassword).getBytes(charset));
            this.authorizationHeader = "Basic " + base64String;
        } else {
            this.authorizationHeader = null;
        }
        this.client.start();
        this.activeSessions.clear();
        this.webSocketUri = new URI(context.getProperty(WS_URI).evaluateAttributeExpressions(new HashMap()).getValue());
        this.connectionTimeoutMillis = context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
        Long sessionMaintenanceInterval = context.getProperty(SESSION_MAINTENANCE_INTERVAL).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
        this.sessionMaintenanceScheduler = Executors.newSingleThreadScheduledExecutor();
        this.sessionMaintenanceScheduler.scheduleAtFixedRate(() -> {
            try {
                this.maintainSessions();
            }
            catch (Exception e) {
                this.getLogger().warn("Failed to maintain sessions due to {}", new Object[]{e, e});
            }
        }, sessionMaintenanceInterval, sessionMaintenanceInterval, TimeUnit.MILLISECONDS);
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        boolean isBaseAuthUsed;
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>(1);
        boolean proxyHostSet = validationContext.getProperty(PROXY_HOST).isSet();
        boolean proxyPortSet = validationContext.getProperty(PROXY_PORT).isSet();
        if (proxyHostSet && !proxyPortSet || !proxyHostSet && proxyPortSet) {
            results.add(new ValidationResult.Builder().subject("HTTP Proxy Host and Port").valid(false).explanation("If HTTP Proxy Host or HTTP Proxy Port is set, both must be set").build());
        }
        boolean bl = isBaseAuthUsed = validationContext.getProperty(USER_NAME).isSet() || validationContext.getProperty(USER_PASSWORD).isSet();
        if (isBaseAuthUsed && validationContext.getProperty(CUSTOM_AUTH).isSet()) {
            results.add(new ValidationResult.Builder().subject("Authentication").valid(false).explanation("Properties related to Basic Authentication (\"User Name\" and \"User Password\") cannot be used together with \"Custom Authorization\"").build());
        }
        return results;
    }

    @OnDisabled
    @OnShutdown
    public void stopClient() throws Exception {
        this.activeSessions.clear();
        if (this.sessionMaintenanceScheduler != null) {
            try {
                this.sessionMaintenanceScheduler.shutdown();
            }
            catch (Exception e) {
                this.getLogger().warn("Failed to shutdown session maintainer due to {}", new Object[]{e, e});
            }
            this.sessionMaintenanceScheduler = null;
        }
        if (this.client == null) {
            return;
        }
        this.client.stop();
        this.client = null;
    }

    public void connect(String clientId) throws IOException {
        this.connect(clientId, null, Collections.emptyMap());
    }

    public void connect(String clientId, Map<String, String> flowFileAttributes) throws IOException {
        this.connect(clientId, null, flowFileAttributes);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void connect(String clientId, String sessionId, Map<String, String> flowFileAttributes) throws IOException {
        try {
            this.webSocketUri = new URI(this.configurationContext.getProperty(WS_URI).evaluateAttributeExpressions(flowFileAttributes).getValue());
        }
        catch (URISyntaxException e) {
            throw new ProcessException("Could not create websocket URI", (Throwable)e);
        }
        this.connectionLock.lock();
        try {
            WebSocketMessageRouter router;
            try {
                router = this.routers.getRouterOrFail(clientId);
            }
            catch (WebSocketConfigurationException e) {
                throw new IllegalStateException("Failed to get router due to: " + String.valueOf((Object)e), e);
            }
            RoutingWebSocketListener listener = new RoutingWebSocketListener(router);
            listener.setSessionId(sessionId);
            ClientUpgradeRequest request = new ClientUpgradeRequest();
            if (!flowFileAttributes.isEmpty()) {
                request.setHeaders(HeaderMapExtractor.getHeaderMap(flowFileAttributes));
            }
            if (!StringUtils.isEmpty((String)this.authorizationHeader)) {
                request.setHeader(HttpHeader.AUTHORIZATION.asString(), this.authorizationHeader);
            }
            Session session = this.attemptConnection(listener, request, this.connectCount);
            this.getLogger().info("Connected, session={}", new Object[]{session});
            this.activeSessions.put(clientId, new SessionInfo(listener.getSessionId(), flowFileAttributes));
        }
        finally {
            this.connectionLock.unlock();
        }
    }

    private Session attemptConnection(RoutingWebSocketListener listener, ClientUpgradeRequest request, int connectCount) throws IOException {
        int backoffMillis = 100;
        for (int i = 0; i < connectCount; ++i) {
            int backoffJitterMillis = (int)(100.0 * this.getBackoffJitter(-0.2, 0.2));
            Future<Session> connect = this.createWebsocketSession(listener, request);
            this.getLogger().info("Connecting to : {}", new Object[]{this.webSocketUri});
            try {
                return connect.get(this.connectionTimeoutMillis, TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException e) {
                this.getLogger().warn("Connection attempt to {} timed out", new Object[]{this.webSocketUri});
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                String errorMessage = String.format("Thread was interrupted while attempting to connect to %s", this.webSocketUri);
                throw new ProcessException(errorMessage, (Throwable)e);
            }
            catch (Exception e) {
                this.getLogger().warn("Failed to connect to {}, reconnection attempt {}", new Object[]{this.webSocketUri, i + 1, e});
            }
            if (i >= connectCount - 1) continue;
            int sleepTime = backoffMillis + backoffJitterMillis;
            try {
                this.getLogger().info("Sleeping {} ms before new connection attempt.", new Object[]{sleepTime});
                Thread.sleep(sleepTime);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                String errorMessage = String.format("Thread was interrupted while reconnecting to %s with %s backoffMillis", this.webSocketUri, sleepTime);
                throw new ProcessException(errorMessage, (Throwable)e);
            }
            backoffMillis = Math.min(backoffMillis * 2, 3200);
        }
        throw new IOException("Failed to connect " + String.valueOf(this.webSocketUri) + " after " + connectCount + " attempts");
    }

    Future<Session> createWebsocketSession(RoutingWebSocketListener listener, ClientUpgradeRequest request) throws IOException {
        return this.client.connect((Object)listener, this.webSocketUri, request);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void maintainSessions() throws Exception {
        if (this.client == null) {
            return;
        }
        this.connectionLock.lock();
        ComponentLog logger = this.getLogger();
        try {
            for (String clientId : this.activeSessions.keySet()) {
                WebSocketMessageRouter router;
                try {
                    router = this.routers.getRouterOrFail(clientId);
                }
                catch (WebSocketConfigurationException e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("The clientId {} is no longer active. Discarding the clientId.", new Object[]{clientId});
                    }
                    this.activeSessions.remove(clientId);
                    continue;
                }
                SessionInfo sessionInfo = this.activeSessions.get(clientId);
                if (router.containsSession(sessionInfo.getSessionId())) continue;
                this.connect(clientId, sessionInfo.getSessionId(), sessionInfo.getFlowFileAttributes());
            }
        }
        finally {
            this.connectionLock.unlock();
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Session maintenance completed. activeSessions={}", new Object[]{this.activeSessions});
        }
    }

    public String getTargetUri() {
        return this.webSocketUri.toString();
    }

    private void configurePolicy(ConfigurationContext context, WebSocketClient policy) {
        int inputBufferSize = context.getProperty(INPUT_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        int maxTextMessageSize = context.getProperty(MAX_TEXT_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
        int maxBinaryMessageSize = context.getProperty(MAX_BINARY_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
        policy.setInputBufferSize(inputBufferSize);
        policy.setMaxTextMessageSize((long)maxTextMessageSize);
        policy.setMaxBinaryMessageSize((long)maxBinaryMessageSize);
    }

    public double getBackoffJitter(double min, double max) {
        return Math.random() * (max - min) + min;
    }
}

