/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.listen.ListenComponent;
import org.apache.nifi.components.listen.ListenPort;
import org.apache.nifi.components.listen.StandardListenPort;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.SslSessionStatus;
import org.apache.nifi.event.transport.configuration.BufferAllocator;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.EventBatcher;
import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processor.util.listen.queue.TrackingLinkedBlockingQueue;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.SSLContextProvider;

@SupportsBatching
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"listen", "tcp", "tls", "ssl"})
@CapabilityDescription(value="Listens for incoming TCP connections and reads data from each connection using a line separator as the message demarcator. The default behavior is for each message to produce a single FlowFile, however this can be controlled by increasing the Batch Size to a larger value for higher throughput. The Receive Buffer Size must be set as large as the largest messages expected to be received, meaning if every 100kb there is a line separator, then the Receive Buffer Size must be greater than 100kb. The processor can be configured to use an SSL Context Service to only allow secure connections. When connected clients present certificates for mutual TLS authentication, the Distinguished Names of the client certificate's issuer and subject are added to the outgoing FlowFiles as attributes. The processor does not perform authorization based on Distinguished Name values, but since these values are attached to the outgoing FlowFiles, authorization can be implemented based on these attributes.")
@WritesAttributes(value={@WritesAttribute(attribute="tcp.sender", description="The sending host of the messages."), @WritesAttribute(attribute="tcp.port", description="The sending port the messages were received."), @WritesAttribute(attribute="client.certificate.issuer.dn", description="For connections using mutual TLS, the Distinguished Name of the Certificate Authority that issued the client's certificate is attached to the FlowFile."), @WritesAttribute(attribute="client.certificate.subject.dn", description="For connections using mutual TLS, the Distinguished Name of the client certificate's owner (subject) is attached to the FlowFile.")})
public class ListenTCP
extends AbstractProcessor
implements ListenComponent {
    private static final String CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE = "client.certificate.subject.dn";
    private static final String CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE = "client.certificate.issuer.dn";
    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("Port").description("The port to listen on for TCP connections.").required(true).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).identifiesListenPort(org.apache.nifi.components.listen.TransportProtocol.TCP, new String[0]).addValidator(StandardValidators.PORT_VALIDATOR).build();
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL Context Service").description("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be received over a secure connection.").required(false).identifiesControllerService(SSLContextProvider.class).build();
    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder().name("Client Auth").description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.").required(true).allowableValues((Enum[])ClientAuth.values()).defaultValue(ClientAuth.REQUIRED.name()).dependsOn(SSL_CONTEXT_SERVICE, new AllowableValue[0]).build();
    protected static final PropertyDescriptor POOL_RECV_BUFFERS = new PropertyDescriptor.Builder().name("Pool Receive Buffers").description("Enable or disable pooling of buffers that the processor uses for handling bytes received on socket connections. The framework allocates buffers as needed during processing.").required(true).defaultValue("True").allowableValues(new String[]{"True", "False"}).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    protected static final PropertyDescriptor IDLE_CONNECTION_TIMEOUT = new PropertyDescriptor.Builder().name("Idle Connection Timeout").description("The amount of time a client's connection will remain open if no data is received. The default of 0 seconds will leave connections open until they are closed by the client.").required(true).defaultValue("0 seconds").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(ListenerProperties.NETWORK_INTF_NAME, ListenerProperties.RECV_BUFFER_SIZE, ListenerProperties.MAX_MESSAGE_QUEUE_SIZE, ListenerProperties.MAX_SOCKET_BUFFER_SIZE, ListenerProperties.CHARSET, ListenerProperties.WORKER_THREADS, ListenerProperties.MAX_BATCH_SIZE, ListenerProperties.MESSAGE_DELIMITER, PORT, IDLE_CONNECTION_TIMEOUT, POOL_RECV_BUFFERS, SSL_CONTEXT_SERVICE, CLIENT_AUTH);
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Messages received successfully will be sent out this relationship.").build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
    private static final long TRACKING_LOG_INTERVAL = 60000L;
    private final AtomicLong nextTrackingLog = new AtomicLong();
    private int eventsCapacity;
    protected volatile int port;
    protected volatile TrackingLinkedBlockingQueue<ByteArrayMessage> events;
    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
    protected volatile EventServer eventServer;
    protected volatile byte[] messageDemarcatorBytes;
    protected volatile EventBatcher<ByteArrayMessage> eventBatcher;

    public void migrateProperties(PropertyConfiguration config) {
        config.removeProperty("max-receiving-threads");
        config.renameProperty("pool-receive-buffers", POOL_RECV_BUFFERS.getName());
        config.renameProperty("idle-timeout", IDLE_CONNECTION_TIMEOUT.getName());
        config.renameProperty("Max Number of TCP Connections", ListenerProperties.WORKER_THREADS.getName());
        config.renameProperty("Message Delimiter", ListenerProperties.MESSAGE_DELIMITER.getName());
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws IOException {
        boolean poolReceiveBuffers;
        int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
        int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        int socketBufferSize = context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        Duration idleTimeout = Duration.ofSeconds(context.getProperty(IDLE_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS));
        String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
        InetAddress address = NetworkUtils.getInterfaceAddress((String)networkInterface);
        Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
        this.port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
        this.eventsCapacity = context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger();
        this.events = new TrackingLinkedBlockingQueue(this.eventsCapacity);
        this.errorEvents = new LinkedBlockingQueue<ByteArrayMessage>();
        String msgDemarcator = this.getMessageDemarcator(context);
        this.messageDemarcatorBytes = msgDemarcator.getBytes(charset);
        ByteArrayMessageNettyEventServerFactory eventFactory = new ByteArrayMessageNettyEventServerFactory(this.getLogger(), address, this.port, TransportProtocol.TCP, this.messageDemarcatorBytes, bufferSize, this.events);
        SSLContextProvider sslContextProvider = (SSLContextProvider)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
        if (sslContextProvider != null) {
            String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
            ClientAuth clientAuth = ClientAuth.valueOf((String)clientAuthValue);
            SSLContext sslContext = sslContextProvider.createContext();
            eventFactory.setSslContext(sslContext);
            eventFactory.setClientAuth(clientAuth);
        }
        BufferAllocator bufferAllocator = (poolReceiveBuffers = context.getProperty(POOL_RECV_BUFFERS).asBoolean().booleanValue()) ? BufferAllocator.POOLED : BufferAllocator.UNPOOLED;
        eventFactory.setBufferAllocator(bufferAllocator);
        eventFactory.setIdleTimeout(idleTimeout);
        eventFactory.setSocketReceiveBuffer(Integer.valueOf(socketBufferSize));
        eventFactory.setWorkerThreads(workerThreads);
        eventFactory.setThreadNamePrefix(String.format("%s[%s]", ((Object)((Object)this)).getClass().getSimpleName(), this.getIdentifier()));
        try {
            this.eventServer = eventFactory.getEventServer();
        }
        catch (EventException e) {
            this.getLogger().error("Failed to bind to [{}:{}]", new Object[]{address, this.port, e});
        }
    }

    public List<ListenPort> getListenPorts(ConfigurationContext context) {
        List<ListenPort> ports;
        Integer portNumber = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
        if (portNumber == null) {
            ports = List.of();
        } else {
            StandardListenPort port = StandardListenPort.builder().portNumber(portNumber.intValue()).portName(PORT.getDisplayName()).transportProtocol(org.apache.nifi.components.listen.TransportProtocol.TCP).build();
            ports = List.of(port);
        }
        return ports;
    }

    public int getListeningPort() {
        return this.eventServer == null ? 0 : this.eventServer.getListeningPort();
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        this.processTrackingLog();
        int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
        Map batches = this.getEventBatcher().getBatches(session, batchSize, this.messageDemarcatorBytes);
        this.processEvents(session, batches);
    }

    private void processEvents(ProcessSession session, Map<String, FlowFileEventBatch<ByteArrayMessage>> batches) {
        for (Map.Entry<String, FlowFileEventBatch<ByteArrayMessage>> entry : batches.entrySet()) {
            FlowFile flowFile = entry.getValue().getFlowFile();
            List events = entry.getValue().getEvents();
            if (flowFile.getSize() == 0L || events.isEmpty()) {
                session.remove(flowFile);
                this.getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[]{entry.getKey()});
                continue;
            }
            Map<String, String> attributes = this.getAttributes(entry.getValue());
            this.addClientCertificateAttributes(attributes, (ByteArrayMessage)events.getFirst());
            flowFile = session.putAllAttributes(flowFile, attributes);
            this.getLogger().debug("Transferring {} to success", new Object[]{flowFile});
            session.transfer(flowFile, REL_SUCCESS);
            session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
            String transitUri = this.getTransitUri(entry.getValue());
            session.getProvenanceReporter().receive(flowFile, transitUri);
        }
    }

    @OnStopped
    public void stopped() {
        if (this.eventServer != null) {
            this.eventServer.shutdown();
        }
        this.eventBatcher = null;
    }

    protected Map<String, String> getAttributes(FlowFileEventBatch<ByteArrayMessage> batch) {
        List events = batch.getEvents();
        String sender = ((ByteArrayMessage)events.getFirst()).getSender();
        HashMap<String, String> attributes = new HashMap<String, String>(3);
        attributes.put("tcp.sender", sender);
        attributes.put("tcp.port", String.valueOf(this.port));
        return attributes;
    }

    protected String getTransitUri(FlowFileEventBatch<ByteArrayMessage> batch) {
        List events = batch.getEvents();
        String sender = ((ByteArrayMessage)events.getFirst()).getSender();
        String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
        return String.format("tcp://%s:%d", senderHost, this.port);
    }

    public final Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    private String getMessageDemarcator(ProcessContext context) {
        return context.getProperty(ListenerProperties.MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
    }

    private EventBatcher<ByteArrayMessage> getEventBatcher() {
        if (this.eventBatcher == null) {
            this.eventBatcher = new EventBatcher<ByteArrayMessage>(this, this.getLogger(), this.events, this.errorEvents){

                protected String getBatchKey(ByteArrayMessage event) {
                    return event.getSender();
                }
            };
        }
        return this.eventBatcher;
    }

    private void addClientCertificateAttributes(Map<String, String> attributes, ByteArrayMessage event) {
        SslSessionStatus sslSessionStatus = event.getSslSessionStatus();
        if (sslSessionStatus != null) {
            attributes.put(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE, sslSessionStatus.getSubject().getName());
            attributes.put(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE, sslSessionStatus.getIssuer().getName());
        }
    }

    private void processTrackingLog() {
        long now = Instant.now().toEpochMilli();
        if (now > this.nextTrackingLog.get()) {
            this.getLogger().debug("Event Queue Capacity [{}] Remaining [{}] Size [{}] Largest Size [{}]", new Object[]{this.eventsCapacity, this.events.remainingCapacity(), this.events.size(), this.events.getLargestSize()});
            long nextTrackingLogScheduled = now + 60000L;
            this.nextTrackingLog.getAndSet(nextTrackingLogScheduled);
        }
    }
}

