/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.email.smtp;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processors.email.ListenSMTP;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.util.StopWatch;
import org.subethamail.smtp.MessageContext;
import org.subethamail.smtp.MessageHandler;
import org.subethamail.smtp.RejectException;
import org.subethamail.smtp.TooMuchDataException;

public class SmtpConsumer
implements MessageHandler {
    private String from = null;
    private final List<String> recipientList = new ArrayList<String>();
    private final MessageContext context;
    private final ProcessSessionFactory sessionFactory;
    private final int port;
    private final int maxMessageSize;
    private final ComponentLog log;
    private final String host;

    public SmtpConsumer(MessageContext context, ProcessSessionFactory sessionFactory, int port, String host, ComponentLog log, int maxMessageSize) {
        this.context = context;
        this.sessionFactory = sessionFactory;
        this.port = port;
        this.host = host == null || host.trim().isEmpty() ? context.getSMTPServer().getHostName() : host;
        this.log = log;
        this.maxMessageSize = maxMessageSize;
    }

    String getFrom() {
        return this.from;
    }

    List<String> getRecipients() {
        return Collections.unmodifiableList(this.recipientList);
    }

    public void data(InputStream data) throws RejectException, TooMuchDataException, IOException {
        ProcessSession processSession = this.sessionFactory.createSession();
        StopWatch watch = new StopWatch();
        watch.start();
        try {
            FlowFile flowFile = processSession.create();
            AtomicBoolean limitExceeded = new AtomicBoolean(false);
            flowFile = processSession.write(flowFile, out -> {
                LimitingInputStream lis = new LimitingInputStream(data, (long)this.maxMessageSize);
                IOUtils.copy((InputStream)lis, (OutputStream)out);
                if (lis.hasReachedLimit()) {
                    limitExceeded.set(true);
                }
            });
            if (limitExceeded.get()) {
                throw new TooMuchDataException("Maximum message size limit reached - client must send smaller messages");
            }
            flowFile = processSession.putAllAttributes(flowFile, this.extractMessageAttributes());
            watch.stop();
            processSession.getProvenanceReporter().receive(flowFile, "smtp://" + this.host + ":" + this.port + "/", watch.getDuration(TimeUnit.MILLISECONDS));
            processSession.transfer(flowFile, ListenSMTP.REL_SUCCESS);
            processSession.commit();
        }
        catch (IOException | IllegalStateException | FlowFileAccessException | RejectException ex) {
            this.log.error("Unable to fully process input due to " + ex.getMessage(), ex);
            throw ex;
        }
        finally {
            processSession.rollback();
        }
    }

    public void from(String from) throws RejectException {
        this.from = from;
    }

    public void recipient(String recipient) throws RejectException {
        if (recipient != null && recipient.length() < 100 && this.recipientList.size() < 100) {
            this.recipientList.add(recipient);
        }
    }

    public void done() {
    }

    private Map<String, String> extractMessageAttributes() {
        SocketAddress address;
        HashMap<String, String> attributes = new HashMap<String, String>();
        Certificate[] tlsPeerCertificates = this.context.getTlsPeerCertificates();
        if (tlsPeerCertificates != null) {
            for (int i = 0; i < tlsPeerCertificates.length; ++i) {
                if (!(tlsPeerCertificates[i] instanceof X509Certificate)) continue;
                X509Certificate x509Cert = (X509Certificate)tlsPeerCertificates[i];
                attributes.put("smtp.certificate." + i + ".serial", x509Cert.getSerialNumber().toString());
                attributes.put("smtp.certificate." + i + ".subjectName", x509Cert.getSubjectDN().getName());
            }
        }
        if ((address = this.context.getRemoteAddress()) != null) {
            String strAddress = address instanceof InetSocketAddress ? ((InetSocketAddress)address).getHostString() + ":" + ((InetSocketAddress)address).getPort() : this.context.getRemoteAddress().toString();
            attributes.put("smtp.src", strAddress);
        }
        attributes.put("smtp.helo", this.context.getHelo());
        attributes.put("smtp.from", this.from);
        for (int i = 0; i < this.recipientList.size(); ++i) {
            attributes.put("smtp.recipient." + i, this.recipientList.get(i));
        }
        attributes.put(CoreAttributes.MIME_TYPE.key(), "message/rfc822");
        return attributes;
    }
}

