/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.box;

import com.box.sdk.BoxAPIConnection;
import com.box.sdk.BoxEvent;
import com.box.sdk.EventListener;
import com.box.sdk.EventStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.box.controllerservices.BoxClientService;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.box.BoxEventJsonArrayWriter;
import org.apache.nifi.processors.box.FetchBoxFile;
import org.apache.nifi.processors.box.ListBoxFile;
import org.apache.nifi.processors.box.PutBoxFile;

@PrimaryNodeOnly
@TriggerSerially
@Tags(value={"box", "storage"})
@CapabilityDescription(value="Consumes all events from Box. This processor can be used to capture events such as uploads, modifications, deletions, etc.\nThe content of the events is sent to the 'success' relationship as a JSON array. Events can be dropped in case of NiFi restart\nor if the queue capacity is exceeded. The last known position of the Box stream is stored in the processor state and is used to\nresume the stream from the last known position when the processor is restarted.\n")
@SeeAlso(value={FetchBoxFile.class, PutBoxFile.class, ListBoxFile.class})
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Stateful(description="The last known position of the Box stream is stored in the processor state and is used to\nresume the stream from the last known position when the processor is restarted.\n", scopes={Scope.CLUSTER})
public class ConsumeBoxEvents
extends AbstractProcessor
implements VerifiableProcessor {
    private static final String POSITION_KEY = "position";
    public static final PropertyDescriptor QUEUE_CAPACITY = new PropertyDescriptor.Builder().name("Queue Capacity").description("The maximum size of the internal queue used to buffer events being transferred from the underlying stream to the processor.\nSetting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total\nmemory used by the processor during these surges.\n").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("10000").required(true).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(BoxClientService.BOX_CLIENT_SERVICE, QUEUE_CAPACITY);
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Events received successfully will be sent out this relationship.").build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
    private volatile BoxAPIConnection boxAPIConnection;
    private volatile EventStream eventStream;
    protected volatile BlockingQueue<BoxEvent> events;
    private volatile AtomicLong position = new AtomicLong(0L);

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public final Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {
        BoxClientService boxClientService = (BoxClientService)context.getProperty(BoxClientService.BOX_CLIENT_SERVICE).asControllerService(BoxClientService.class);
        this.boxAPIConnection = boxClientService.getBoxApiConnection();
        try {
            String position = context.getStateManager().getState(Scope.CLUSTER).get(POSITION_KEY);
            this.eventStream = position == null ? new EventStream(this.boxAPIConnection) : new EventStream(this.boxAPIConnection, Long.parseLong(position));
        }
        catch (Exception e) {
            throw new ProcessException("Could not retrieve last event position", (Throwable)e);
        }
        int queueCapacity = context.getProperty(QUEUE_CAPACITY).asInteger();
        if (this.events == null) {
            this.events = new LinkedBlockingQueue<BoxEvent>(queueCapacity);
        } else {
            LinkedBlockingQueue<BoxEvent> newQueue = new LinkedBlockingQueue<BoxEvent>(queueCapacity);
            newQueue.addAll(this.events);
            this.events = newQueue;
        }
        this.eventStream.addListener(new EventListener(){

            public void onEvent(BoxEvent event) {
                try {
                    ConsumeBoxEvents.this.events.put(event);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException("Interrupted while trying to put the event into the queue", e);
                }
            }

            public void onNextPosition(long pos) {
                try {
                    context.getStateManager().setState(Map.of(ConsumeBoxEvents.POSITION_KEY, String.valueOf(pos)), Scope.CLUSTER);
                    ConsumeBoxEvents.this.position.set(pos);
                }
                catch (IOException e) {
                    ConsumeBoxEvents.this.getLogger().warn("Failed to save position {} in processor state", new Object[]{pos, e});
                }
            }

            public boolean onException(Throwable e) {
                ConsumeBoxEvents.this.getLogger().warn("An error has been received from the stream. Last tracked position {}", new Object[]{ConsumeBoxEvents.this.position.get(), e});
                return true;
            }
        });
        this.eventStream.start();
    }

    @OnStopped
    public void stopped() {
        if (this.eventStream != null && this.eventStream.isStarted()) {
            this.eventStream.stop();
        }
    }

    public List<ConfigVerificationResult> verify(ProcessContext context, ComponentLog verificationLogger, Map<String, String> attributes) {
        ArrayList<ConfigVerificationResult> results = new ArrayList<ConfigVerificationResult>();
        BoxClientService boxClientService = (BoxClientService)context.getProperty(BoxClientService.BOX_CLIENT_SERVICE).asControllerService(BoxClientService.class);
        this.boxAPIConnection = boxClientService.getBoxApiConnection();
        try {
            this.boxAPIConnection.refresh();
            results.add(new ConfigVerificationResult.Builder().verificationStepName("Box API Connection").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Successfully validated Box connection").build());
        }
        catch (Exception e) {
            this.getLogger().warn("Failed to verify configuration", (Throwable)e);
            results.add(new ConfigVerificationResult.Builder().verificationStepName("Box API Connection").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("Failed to validate Box connection: %s", e.getMessage())).build());
        }
        return results;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        if (this.events.isEmpty()) {
            context.yield();
            return;
        }
        FlowFile flowFile = session.create();
        ArrayList boxEvents = new ArrayList();
        int recordCount = this.events.drainTo(boxEvents);
        try (OutputStream out = session.write(flowFile);
             BoxEventJsonArrayWriter writer = BoxEventJsonArrayWriter.create(out);){
            for (BoxEvent event : boxEvents) {
                writer.write(event);
            }
        }
        catch (Exception e) {
            this.getLogger().error("Failed to write events to FlowFile; will re-queue events and try again", (Throwable)e);
            boxEvents.forEach(this.events::offer);
            session.remove(flowFile);
            context.yield();
            return;
        }
        session.putAttribute(flowFile, "record.count", String.valueOf(recordCount));
        session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
        session.transfer(flowFile, REL_SUCCESS);
    }
}

