/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.box;

import com.box.sdk.BoxAPIConnection;
import com.box.sdk.BoxEvent;
import com.box.sdk.EnterpriseEventsStreamRequest;
import com.box.sdk.EventLog;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.box.controllerservices.BoxClientService;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.box.AbstractBoxProcessor;
import org.apache.nifi.processors.box.BoxEventJsonArrayWriter;
import org.apache.nifi.processors.box.ConsumeBoxEvents;
import org.apache.nifi.processors.box.FetchBoxFile;
import org.apache.nifi.processors.box.ListBoxFile;

@PrimaryNodeOnly
@TriggerSerially
@Tags(value={"box", "storage"})
@CapabilityDescription(value="Consumes Enterprise Events from Box admin_logs_streaming Stream Type.\nThe content of the events is sent to the 'success' relationship as a JSON array.\nThe last known position of the Box stream is stored in the processor state and is used to\nresume the stream from the last known position when the processor is restarted.\n")
@SeeAlso(value={ConsumeBoxEvents.class, FetchBoxFile.class, ListBoxFile.class})
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Stateful(description="The last known position of the Box Event stream is stored in the processor state and is used to\nresume the stream from the last known position when the processor is restarted.\n", scopes={Scope.CLUSTER})
public class ConsumeBoxEnterpriseEvents
extends AbstractBoxProcessor {
    private static final String POSITION_KEY = "position";
    private static final String EARLIEST_POSITION = "0";
    private static final String LATEST_POSITION = "now";
    private static final int LIMIT = 500;
    static final String COUNTER_RECORDS_PROCESSED = "Records Processed";
    static final PropertyDescriptor EVENT_TYPES = new PropertyDescriptor.Builder().name("Event Types").description("A comma separated list of Enterprise Events to consume. If not set, all Events are consumed.See Additional Details for more information.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor START_EVENT_POSITION = new PropertyDescriptor.Builder().name("Start Event Position").description("What position to consume the Events from.").required(true).allowableValues(StartEventPosition.class).defaultValue((DescribedValue)StartEventPosition.EARLIEST).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    static final PropertyDescriptor START_OFFSET = new PropertyDescriptor.Builder().name("Start Offset").description("The offset to start consuming the Events from.").required(true).dependsOn(START_EVENT_POSITION, (DescribedValue)StartEventPosition.OFFSET, new DescribedValue[0]).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(BOX_CLIENT_SERVICE, EVENT_TYPES, START_EVENT_POSITION, START_OFFSET);
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Events received successfully will be sent out this relationship.").build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
    private volatile BoxAPIConnection boxAPIConnection;
    private volatile String[] eventTypes;
    private volatile String streamPosition;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @OnScheduled
    public void onEnabled(ProcessContext context) {
        BoxClientService boxClientService = (BoxClientService)context.getProperty(BOX_CLIENT_SERVICE).asControllerService(BoxClientService.class);
        this.boxAPIConnection = boxClientService.getBoxApiConnection();
        this.eventTypes = context.getProperty(EVENT_TYPES).isSet() ? context.getProperty(EVENT_TYPES).getValue().split(",") : new String[]{};
        this.streamPosition = this.calculateStreamPosition(context);
    }

    private String calculateStreamPosition(ProcessContext context) {
        return this.readStreamPosition(context).orElseGet(() -> this.initializeStartEventPosition(context));
    }

    private Optional<String> readStreamPosition(ProcessContext context) {
        try {
            String position = context.getStateManager().getState(Scope.CLUSTER).get(POSITION_KEY);
            return Optional.ofNullable(position);
        }
        catch (IOException e) {
            throw new ProcessException("Could not retrieve saved event position", (Throwable)e);
        }
    }

    private void writeStreamPosition(String position, ProcessSession session) {
        try {
            Map<String, String> stateMap = Map.of(POSITION_KEY, position);
            session.setState(stateMap, Scope.CLUSTER);
        }
        catch (IOException e) {
            throw new ProcessException("Could not save event position", (Throwable)e);
        }
    }

    private String initializeStartEventPosition(ProcessContext context) {
        StartEventPosition startEventPosition = (StartEventPosition)context.getProperty(START_EVENT_POSITION).asAllowableValue(StartEventPosition.class);
        return switch (startEventPosition.ordinal()) {
            default -> throw new MatchException(null, null);
            case 0 -> EARLIEST_POSITION;
            case 1 -> this.retrieveLatestStreamPosition();
            case 2 -> context.getProperty(START_OFFSET).getValue();
        };
    }

    private String retrieveLatestStreamPosition() {
        EventLog eventLog = this.getEventLog(LATEST_POSITION);
        return eventLog.getNextStreamPosition();
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        while (this.isScheduled()) {
            this.getLogger().debug("Consuming Box Events from position: {}", new Object[]{this.streamPosition});
            EventLog eventLog = this.getEventLog(this.streamPosition);
            this.streamPosition = eventLog.getNextStreamPosition();
            this.getLogger().debug("Consumed {} Box Enterprise Events. New position: {}", new Object[]{eventLog.getSize(), this.streamPosition});
            this.writeStreamPosition(this.streamPosition, session);
            if (eventLog.getSize() == 0) break;
            this.writeLogAsRecords(eventLog, session);
        }
    }

    EventLog getEventLog(String position) {
        EnterpriseEventsStreamRequest request = new EnterpriseEventsStreamRequest().limit(500).position(position).typeNames(this.eventTypes);
        return EventLog.getEnterpriseEventsStream((BoxAPIConnection)this.boxAPIConnection, (EnterpriseEventsStreamRequest)request);
    }

    private void writeLogAsRecords(EventLog eventLog, ProcessSession session) {
        FlowFile flowFile = session.create();
        try (OutputStream out = session.write(flowFile);
             BoxEventJsonArrayWriter writer = BoxEventJsonArrayWriter.create(out);){
            for (BoxEvent event : eventLog) {
                writer.write(event);
            }
        }
        catch (IOException e) {
            throw new ProcessException("Failed to write Box Event into a FlowFile", (Throwable)e);
        }
        session.adjustCounter(COUNTER_RECORDS_PROCESSED, (long)eventLog.getSize(), false);
        session.putAttribute(flowFile, "record.count", String.valueOf(eventLog.getSize()));
        session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
        session.transfer(flowFile, REL_SUCCESS);
    }

    public static enum StartEventPosition implements DescribedValue
    {
        EARLIEST("earliest", "Start consuming events from the earliest available Event."),
        LATEST("latest", "Start consuming events from the latest Event."),
        OFFSET("offset", "Start consuming events from the specified offset.");

        private final String value;
        private final String description;

        private StartEventPosition(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String getValue() {
            return this.value;
        }

        public String getDisplayName() {
            return this.value;
        }

        public String getDescription() {
            return this.description;
        }
    }
}

