/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.slack;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.slack.api.app_backend.events.payload.EventsApiPayload;
import com.slack.api.bolt.App;
import com.slack.api.bolt.AppConfig;
import com.slack.api.bolt.context.builtin.EventContext;
import com.slack.api.bolt.context.builtin.SlashCommandContext;
import com.slack.api.bolt.request.builtin.SlashCommandRequest;
import com.slack.api.bolt.response.Response;
import com.slack.api.bolt.socket_mode.SocketModeApp;
import com.slack.api.model.User;
import com.slack.api.model.event.AppMentionEvent;
import com.slack.api.model.event.FileChangeEvent;
import com.slack.api.model.event.FileCreatedEvent;
import com.slack.api.model.event.FileDeletedEvent;
import com.slack.api.model.event.FilePublicEvent;
import com.slack.api.model.event.FileSharedEvent;
import com.slack.api.model.event.FileUnsharedEvent;
import com.slack.api.model.event.MemberJoinedChannelEvent;
import com.slack.api.model.event.MessageChangedEvent;
import com.slack.api.model.event.MessageChannelJoinEvent;
import com.slack.api.model.event.MessageDeletedEvent;
import com.slack.api.model.event.MessageEvent;
import com.slack.api.model.event.MessageFileShareEvent;
import com.slack.api.model.event.ReactionAddedEvent;
import com.slack.api.model.event.ReactionRemovedEvent;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.slack.ConsumeSlack;
import org.apache.nifi.processors.slack.consume.UserDetailsLookup;

@PrimaryNodeOnly
@DefaultSettings(yieldDuration="250 millis")
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@WritesAttributes(value={@WritesAttribute(attribute="mime.type", description="Set to application/json, as the output will always be in JSON format"), @WritesAttribute(attribute="slack.event.type", description="Set to the type of Slack event that occurred")})
@SeeAlso(value={ConsumeSlack.class})
@Tags(value={"slack", "real-time", "event", "message", "command", "listen", "receive", "social media", "team", "text", "unstructured"})
@CapabilityDescription(value="Retrieves real-time messages or Slack commands from one or more Slack conversations. The messages are written out in JSON format. Note that this Processor should be used to obtain real-time messages and commands from Slack and does not provide a mechanism for obtaining historical messages. The ConsumeSlack Processor should be used for an initial load of messages from a channel. See Usage / Additional Details for more information about how to configure this Processor and enable it to retrieve messages and commands from Slack.")
public class ListenSlack
extends AbstractProcessor {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    static final AllowableValue RECEIVE_MESSAGE_EVENTS;
    static final AllowableValue RECEIVE_MENTION_EVENTS;
    static final AllowableValue RECEIVE_COMMANDS;
    static final AllowableValue RECEIVE_JOINED_CHANNEL_EVENTS;
    static PropertyDescriptor APP_TOKEN;
    static PropertyDescriptor BOT_TOKEN;
    static final PropertyDescriptor EVENT_TYPE;
    static final PropertyDescriptor RESOLVE_USER_DETAILS;
    static Relationship REL_SUCCESS;
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
    private static final Set<Relationship> RELATIONSHIPS;
    private final TransferQueue<EventWrapper> eventTransferQueue = new LinkedTransferQueue<EventWrapper>();
    private volatile SocketModeApp socketModeApp;
    private volatile UserDetailsLookup userDetailsLookup;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @OnScheduled
    public void establishWebsocketEndpoint(ProcessContext context) throws Exception {
        String appToken = context.getProperty(APP_TOKEN).getValue();
        String botToken = context.getProperty(BOT_TOKEN).getValue();
        AppConfig appConfig = AppConfig.builder().singleTeamBotToken(botToken).build();
        App slackApp = new App(appConfig);
        if (context.getProperty(EVENT_TYPE).getValue().equals(RECEIVE_MESSAGE_EVENTS.getValue())) {
            slackApp.event(MessageEvent.class, this::handleEvent);
            slackApp.event(MessageChangedEvent.class, this::handleEvent);
            slackApp.event(MessageDeletedEvent.class, this::handleEvent);
            slackApp.event(MessageFileShareEvent.class, this::handleEvent);
            slackApp.event(FileSharedEvent.class, this::handleEvent);
            slackApp.event(FileChangeEvent.class, this::handleEvent);
            slackApp.event(FileCreatedEvent.class, this::handleEvent);
            slackApp.event(FileDeletedEvent.class, this::handleEvent);
            slackApp.event(FilePublicEvent.class, this::handleEvent);
            slackApp.event(FileUnsharedEvent.class, this::handleEvent);
            slackApp.event(ReactionAddedEvent.class, this::handleEvent);
            slackApp.event(ReactionRemovedEvent.class, this::handleEvent);
        } else if (context.getProperty(EVENT_TYPE).getValue().equals(RECEIVE_MENTION_EVENTS.getValue())) {
            slackApp.event(AppMentionEvent.class, this::handleEvent);
            slackApp.event(MessageEvent.class, (payload, ctx) -> ctx.ack());
        } else if (context.getProperty(EVENT_TYPE).getValue().equals(RECEIVE_JOINED_CHANNEL_EVENTS.getValue())) {
            slackApp.event(MemberJoinedChannelEvent.class, this::handleEvent);
            slackApp.event(MessageChannelJoinEvent.class, this::handleEvent);
            slackApp.event(MessageEvent.class, (payload, ctx) -> ctx.ack());
        } else {
            slackApp.command(Pattern.compile(".*"), this::handleCommand);
        }
        this.userDetailsLookup = new UserDetailsLookup(userId -> slackApp.client().usersInfo(r -> r.user(userId)), this.getLogger());
        this.socketModeApp = new SocketModeApp(appToken, slackApp);
        this.socketModeApp.startAsync();
    }

    private Response handleEvent(EventsApiPayload<?> eventsApiPayload, EventContext context) {
        return this.handleNotification(eventsApiPayload.getEvent(), () -> ((EventContext)context).ack());
    }

    private Response handleCommand(SlashCommandRequest request, SlashCommandContext context) {
        return this.handleNotification(request.getPayload(), () -> ((SlashCommandContext)context).ack());
    }

    private Response handleNotification(Object notification, Supplier<Response> ackSupplier) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.eventTransferQueue.add(new EventWrapper(notification, countDownLatch));
        try {
            boolean acknowledged = countDownLatch.await(5L, TimeUnit.SECONDS);
            return acknowledged ? ackSupplier.get() : Response.error((Integer)503);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted while waiting for event to be processed", e);
        }
    }

    @OnStopped
    public void onStopped() throws Exception {
        this.socketModeApp.stop();
        this.socketModeApp.close();
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        EventWrapper eventWrapper;
        try {
            eventWrapper = (EventWrapper)this.eventTransferQueue.poll(1L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        if (eventWrapper == null) {
            context.yield();
            return;
        }
        Object messageEvent = eventWrapper.getEvent();
        String eventType = messageEvent.getClass().getSimpleName();
        FlowFile flowFile = session.create();
        try (OutputStream out = session.write(flowFile);
             JsonGenerator generator = OBJECT_MAPPER.createGenerator(out);){
            if (context.getProperty(RESOLVE_USER_DETAILS).asBoolean().booleanValue()) {
                String userId;
                User userDetails;
                String stringRepresentation = OBJECT_MAPPER.writeValueAsString(messageEvent);
                JsonNode jsonNode = OBJECT_MAPPER.readTree(stringRepresentation);
                if (jsonNode.hasNonNull("user") && (userDetails = this.userDetailsLookup.getUserDetails(userId = jsonNode.get("user").asText())) != null) {
                    ObjectNode objectNode = (ObjectNode)jsonNode;
                    String userDetailsJson = OBJECT_MAPPER.writeValueAsString((Object)userDetails);
                    JsonNode userDetailsNode = OBJECT_MAPPER.readTree(userDetailsJson);
                    objectNode.set("userDetails", userDetailsNode);
                }
                generator.writeTree((TreeNode)jsonNode);
            } else {
                generator.writeObject(messageEvent);
            }
        }
        catch (IOException e) {
            this.getLogger().error("Failed to write out Slack message", (Throwable)e);
            session.remove(flowFile);
            return;
        }
        flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
        flowFile = session.putAttribute(flowFile, "slack.event.type", eventType);
        session.getProvenanceReporter().receive(flowFile, this.socketModeApp.getClient().getWssUri().toString());
        session.transfer(flowFile, REL_SUCCESS);
        session.commitAsync(() -> eventWrapper.getCountDownLatch().countDown());
    }

    static {
        OBJECT_MAPPER.registerModule((Module)new JavaTimeModule());
        RECEIVE_MESSAGE_EVENTS = new AllowableValue("Receive Message Events", "Receive Message Events", "The Processor is to receive Slack Message Events");
        RECEIVE_MENTION_EVENTS = new AllowableValue("Receive App Mention Events", "Receive App Mention Events", "The Processor is to receive only slack messages that mention the bot user (App Mention Events)");
        RECEIVE_COMMANDS = new AllowableValue("Receive Commands", "Receive Commands", "The Processor is to receive Commands from Slack that are specific to your application. The Processor will not receive Message Events.");
        RECEIVE_JOINED_CHANNEL_EVENTS = new AllowableValue("Receive Joined Channel Events", "Receive Joined Channel Events", "The Processor is to receive only events when a member is joining a channel. The Processor will not receive Message Events.");
        APP_TOKEN = new PropertyDescriptor.Builder().name("App Token").description("The Application Token that is registered to your Slack application").addValidator(StandardValidators.NON_BLANK_VALIDATOR).required(true).sensitive(true).build();
        BOT_TOKEN = new PropertyDescriptor.Builder().name("Bot Token").description("The Bot Token that is registered to your Slack application").addValidator(StandardValidators.NON_BLANK_VALIDATOR).required(true).sensitive(true).build();
        EVENT_TYPE = new PropertyDescriptor.Builder().name("Event Type to Receive").description("Specifies the type of Event that the Processor should respond to").required(true).defaultValue(RECEIVE_MENTION_EVENTS.getValue()).allowableValues(new DescribedValue[]{RECEIVE_MENTION_EVENTS, RECEIVE_MESSAGE_EVENTS, RECEIVE_COMMANDS, RECEIVE_JOINED_CHANNEL_EVENTS}).build();
        RESOLVE_USER_DETAILS = new PropertyDescriptor.Builder().name("Resolve User Details").description("Specifies whether the Processor should lookup details about the Slack User who sent the received message. If true, the output JSON will contain an additional field named 'userDetails'. The 'user' field will still contain the ID of the user. In order to enable this capability, the Bot Token must be granted the 'users:read' and optionally the 'users.profile:read' Bot Token Scope. If the rate limit is exceeded when retrieving this information, the received message will be rejected and must be re-delivered.").required(true).defaultValue("false").allowableValues(new String[]{"true", "false"}).dependsOn(EVENT_TYPE, new AllowableValue[]{RECEIVE_MESSAGE_EVENTS, RECEIVE_MENTION_EVENTS, RECEIVE_JOINED_CHANNEL_EVENTS}).build();
        REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are created will be sent to this Relationship.").build();
        PROPERTY_DESCRIPTORS = List.of(APP_TOKEN, BOT_TOKEN, EVENT_TYPE, RESOLVE_USER_DETAILS);
        RELATIONSHIPS = Set.of(REL_SUCCESS);
    }

    private static class EventWrapper {
        private final Object event;
        private final CountDownLatch countDownLatch;

        public EventWrapper(Object event, CountDownLatch countDownLatch) {
            this.event = event;
            this.countDownLatch = countDownLatch;
        }

        public Object getEvent() {
            return this.event;
        }

        public CountDownLatch getCountDownLatch() {
            return this.countDownLatch;
        }
    }
}

