/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.tests.system;

import java.time.Duration;
import java.util.Set;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;

@CapabilityDescription(value="Generates an empty FlowFile and counts how many times the session commit callback is called for both success and failure before and after Processor is stopped")
public class GenerateAndCountCallbacks
extends AbstractProcessor {
    public static final String STOPPED_AND_SUCCESS = "Stopped and Success";
    public static final String STOPPED_AND_FAILURE = "Stopped and Failure";
    public static final String RUNNING_AND_SUCCESS = "Running and Success";
    public static final String RUNNING_AND_FAILURE = "Running and Failure";
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles are routed to this relationship").build();
    private volatile boolean stopped = false;

    public Set<Relationship> getRelationships() {
        return Set.of(REL_SUCCESS);
    }

    @OnScheduled
    public void onScheduled() {
        this.stopped = false;
        this.getLogger().info("Processor started");
    }

    @OnStopped
    public void onStopped() {
        this.stopped = true;
        this.getLogger().info("Processor stopped");
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.create();
        session.transfer(flowFile, REL_SUCCESS);
        session.commitAsync(() -> {
            this.sleepUninterruptibly(Duration.ofSeconds(3L));
            if (this.stopped) {
                session.adjustCounter(STOPPED_AND_SUCCESS, 1L, true);
                this.getLogger().error("Success callback called after Processor Stopped");
            } else {
                session.adjustCounter(RUNNING_AND_SUCCESS, 1L, true);
                this.getLogger().info("Success callback called while Processor Running");
            }
        }, failureCause -> {
            this.sleepUninterruptibly(Duration.ofSeconds(3L));
            if (this.stopped) {
                session.adjustCounter(STOPPED_AND_FAILURE, 1L, true);
                this.getLogger().error("Failure callback called after Processor Stopped; Failure cause: {}", new Object[]{failureCause.toString()});
            } else {
                session.adjustCounter(RUNNING_AND_FAILURE, 1L, true);
                this.getLogger().warn("Failure callback called while Processor Running; Failure cause: {}", new Object[]{failureCause.toString()});
            }
        });
    }

    private void sleepUninterruptibly(Duration duration) {
        long endTime = System.currentTimeMillis() + duration.toMillis();
        while (System.currentTimeMillis() < endTime) {
            try {
                Thread.sleep(endTime - System.currentTimeMillis());
            }
            catch (InterruptedException interruptedException) {}
        }
    }
}

