/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.Wait;
import org.apache.nifi.processors.standard.WaitNotifyProtocol;

@SupportsBatching
@Tags(value={"map", "cache", "notify", "distributed", "signal", "release"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Caches a release signal identifier in the distributed cache, optionally along with the FlowFile's attributes.  Any flow files held at a corresponding Wait processor will be released once this signal in the cache is discovered.")
@WritesAttribute(attribute="notified", description="All FlowFiles will have an attribute 'notified'. The value of this attribute is true, is the FlowFile is notified, otherwise false.")
@SeeAlso(classNames={"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.processors.standard.Wait"})
public class Notify
extends AbstractProcessor {
    public static final String NOTIFIED_ATTRIBUTE_NAME = "notified";
    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder().name("distributed-cache-service").displayName("Distributed Cache Service").description("The Controller Service that is used to cache release signals in order to release files queued at a corresponding Wait processor").required(true).identifiesControllerService(AtomicDistributedMapCacheClient.class).build();
    public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new PropertyDescriptor.Builder().name("release-signal-id").displayName("Release Signal Identifier").description("A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the release signal cache key").required(true).addValidator(StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true)).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder().name("signal-counter-name").displayName("Signal Counter Name").description("A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the signal counter name. Signal counter name is useful when a corresponding Wait processor needs to know the number of occurrences of different types of events, such as success or failure, or destination data source names, etc.").required(true).addValidator(StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true)).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("default").build();
    public static final PropertyDescriptor SIGNAL_COUNTER_DELTA = new PropertyDescriptor.Builder().name("signal-counter-delta").displayName("Signal Counter Delta").description("A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the signal counter delta. Specify how much the counter should increase. For example, if multiple signal events are processed at upstream flow in batch oriented way, the number of events processed can be notified with this property at once. Zero (0) has a special meaning, it clears target count back to 0, which is especially useful when used with Wait " + Wait.RELEASABLE_FLOWFILE_COUNT.getDisplayName() + " = Zero (0) mode, to provide 'open-close-gate' type of flow control. One (1) can open a corresponding Wait processor, and Zero (0) can negate it as if closing a gate.").required(true).addValidator(StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true)).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("1").build();
    public static final PropertyDescriptor SIGNAL_BUFFER_COUNT = new PropertyDescriptor.Builder().name("signal-buffer-count").displayName("Signal Buffer Count").description("Specify the maximum number of incoming flow files that can be buffered until signals are notified to cache service. The more buffer can provide the better performance, as it reduces the number of interactions with cache service by grouping signals by signal identifier when multiple incoming flow files share the same signal identifier.").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("1").build();
    public static final PropertyDescriptor ATTRIBUTE_CACHE_REGEX = new PropertyDescriptor.Builder().name("attribute-cache-regex").displayName("Attribute Cache Regex").description("Any attributes whose names match this regex will be stored in the distributed cache to be copied to any FlowFiles released from a corresponding Wait processor.  Note that the uuid attribute will not be cached regardless of this value.  If blank, no attributes will be cached.").addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(RELEASE_SIGNAL_IDENTIFIER, SIGNAL_COUNTER_NAME, SIGNAL_COUNTER_DELTA, SIGNAL_BUFFER_COUNT, DISTRIBUTED_CACHE_SERVICE, ATTRIBUTE_CACHE_REGEX);
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles where the release signal has been successfully entered in the cache will be routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship").build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile;
        ComponentLog logger = this.getLogger();
        PropertyValue signalIdProperty = context.getProperty(RELEASE_SIGNAL_IDENTIFIER);
        PropertyValue counterNameProperty = context.getProperty(SIGNAL_COUNTER_NAME);
        PropertyValue deltaProperty = context.getProperty(SIGNAL_COUNTER_DELTA);
        String attributeCacheRegex = context.getProperty(ATTRIBUTE_CACHE_REGEX).getValue();
        Integer bufferCount = context.getProperty(SIGNAL_BUFFER_COUNT).asInteger();
        AtomicDistributedMapCacheClient cache = (AtomicDistributedMapCacheClient)context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
        WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
        HashMap<String, SignalBuffer> signalBuffers = new HashMap<String, SignalBuffer>();
        for (int i = 0; i < bufferCount && (flowFile = session.get()) != null; ++i) {
            String signalId2 = signalIdProperty.evaluateAttributeExpressions(flowFile).getValue();
            if (StringUtils.isBlank((CharSequence)signalId2)) {
                logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[]{flowFile});
                session.transfer(session.putAttribute(flowFile, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(false)), REL_FAILURE);
                continue;
            }
            String counterName = counterNameProperty.evaluateAttributeExpressions(flowFile).getValue();
            if (StringUtils.isEmpty((CharSequence)counterName)) {
                counterName = "default";
            }
            int delta = 1;
            if (deltaProperty.isSet()) {
                String deltaStr = deltaProperty.evaluateAttributeExpressions(flowFile).getValue();
                try {
                    delta = Integer.parseInt(deltaStr);
                }
                catch (NumberFormatException e2) {
                    logger.error("Failed to calculate delta for FlowFile {} due to {}", new Object[]{flowFile, e2, e2});
                    session.transfer(session.putAttribute(flowFile, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(false)), REL_FAILURE);
                    continue;
                }
            }
            if (!signalBuffers.containsKey(signalId2)) {
                signalBuffers.put(signalId2, new SignalBuffer(this));
            }
            SignalBuffer signalBuffer2 = (SignalBuffer)signalBuffers.get(signalId2);
            if (StringUtils.isNotEmpty((CharSequence)attributeCacheRegex)) {
                flowFile.getAttributes().entrySet().stream().filter(e -> !((String)e.getKey()).equals("uuid") && ((String)e.getKey()).matches(attributeCacheRegex)).forEach(e -> signalBuffer.attributesToCache.put((String)e.getKey(), (String)e.getValue()));
            }
            signalBuffer2.incrementDelta(counterName, delta);
            signalBuffer2.flowFiles.add(flowFile);
            if (!logger.isDebugEnabled()) continue;
            logger.debug("Cached release signal identifier {} counterName {} from FlowFile {}", new Object[]{signalId2, counterName, flowFile});
        }
        signalBuffers.forEach((signalId, signalBuffer) -> {
            try {
                protocol.notify((String)signalId, signalBuffer.deltas, signalBuffer.attributesToCache);
                signalBuffer.flowFiles.forEach(flowFile -> session.transfer(session.putAttribute(flowFile, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(true)), REL_SUCCESS));
            }
            catch (IOException e) {
                throw new RuntimeException(String.format("Unable to communicate with cache when processing %s due to %s", signalId, e), e);
            }
        });
    }

    private class SignalBuffer {
        final Map<String, Integer> deltas = new HashMap<String, Integer>();
        final Map<String, String> attributesToCache = new HashMap<String, String>();
        final List<FlowFile> flowFiles = new ArrayList<FlowFile>();

        private SignalBuffer(Notify notify) {
        }

        int incrementDelta(String counterName, int delta) {
            int current = this.deltas.getOrDefault(counterName, 0);
            int updated = delta == 0 ? 0 : current + delta;
            this.deltas.put(counterName, updated);
            return updated;
        }
    }
}

