/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.WaitNotifyProtocol;

@SupportsBatching
@Tags(value={"map", "cache", "wait", "hold", "distributed", "signal", "release"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Routes incoming FlowFiles to the 'wait' relationship until a matching release signal is stored in the cache from a corresponding Notify processor. When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship. The release signal entry is then removed from the cache. The attributes of the FlowFile that produced the release signal are copied to the waiting FlowFile if the Attribute Cache Regex property of the corresponding Notify processor is set properly. If there are multiple release signals in the cache identified by the Release Signal Identifier, and the Notify processor is configured to copy the FlowFile attributes to the cache, then the FlowFile passing the Wait processor receives the union of the attributes of the FlowFiles that produced the release signals in the cache (identified by Release Signal Identifier). Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. This is particularly useful with processors that split a source FlowFile into multiple fragments, such as SplitText. In order to wait for all fragments to be processed, connect the 'original' relationship to a Wait processor, and the 'splits' relationship to a corresponding Notify processor. Configure the Notify and Wait processors to use the '${fragment.identifier}' as the value of 'Release Signal Identifier', and specify '${fragment.count}' as the value of 'Target Signal Count' in the Wait processor.It is recommended to use a prioritizer (for instance First In First Out) when using the 'wait' relationship as a loop.")
@WritesAttributes(value={@WritesAttribute(attribute="wait.start.timestamp", description="All FlowFiles will have an attribute 'wait.start.timestamp', which sets the initial epoch timestamp when the file first entered this processor.  This is used to determine the expiration time of the FlowFile.  This attribute is not written when the FlowFile is transferred to failure, expired or success"), @WritesAttribute(attribute="wait.counter.<counterName>", description="The name of each counter for which at least one signal has been present in the cache since the last time the cache was empty gets copied to the current FlowFile as an attribute.")})
@SeeAlso(classNames={"org.apache.nifi.distributed.cache.client.MapCacheClientService", "org.apache.nifi.distributed.cache.server.map.MapCacheServer", "org.apache.nifi.processors.standard.Notify"})
public class Wait
extends AbstractProcessor {
    public static final String WAIT_START_TIMESTAMP = "wait.start.timestamp";
    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder().name("Distributed Cache Service").description("The Controller Service that is used to check for release signals from a corresponding Notify processor").required(true).identifiesControllerService(AtomicDistributedMapCacheClient.class).build();
    public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new PropertyDescriptor.Builder().name("Release Signal Identifier").description("A value that specifies the key to a specific release signal cache. To decide whether the FlowFile that is being processed by the Wait processor should be sent to the 'success' or the 'wait' relationship, the processor checks the signals in the cache specified by this key.").required(true).addValidator(StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true)).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor TARGET_SIGNAL_COUNT = new PropertyDescriptor.Builder().name("Target Signal Count").description("The number of signals that need to be in the cache (specified by the Release Signal Identifier) in order for the FlowFile processed by the Wait processor to be sent to the \u2018success\u2019 relationship. If the number of signals in the cache has reached this number, the FlowFile is routed to the 'success' relationship and the number of signals in the cache is decreased by this value. If Signal Counter Name is specified, this processor checks a particular counter, otherwise checks against the total number of signals in the cache.").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("1").build();
    public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder().name("Signal Counter Name").description("Within the cache (specified by the Release Signal Identifier) the signals may belong to different counters. If this property is specified, the processor checks the number of signals in the cache that belong to this particular counter. If not specified, the processor checks the total number of signals in the cache.").required(false).addValidator(StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true)).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor WAIT_BUFFER_COUNT = new PropertyDescriptor.Builder().name("Wait Buffer Count").description("Specify the maximum number of incoming FlowFiles that can be buffered to check whether it can move forward. The more buffer can provide the better performance, as it reduces the number of interactions with cache service by grouping FlowFiles by signal identifier. Only a signal identifier can be processed at a processor execution.").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("1").build();
    public static final PropertyDescriptor RELEASABLE_FLOWFILE_COUNT = new PropertyDescriptor.Builder().name("Releasable FlowFile Count").description("A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the releasable FlowFile count. This specifies how many FlowFiles can be released when a target count reaches target signal count. Zero (0) has a special meaning, any number of FlowFiles can be released as long as signal count matches target.").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("1").build();
    public static final PropertyDescriptor EXPIRATION_DURATION = new PropertyDescriptor.Builder().name("Expiration Duration").description("Indicates the duration after which waiting FlowFiles will be routed to the 'expired' relationship").required(true).defaultValue("10 min").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final AllowableValue ATTRIBUTE_COPY_REPLACE = new AllowableValue("replace", "Replace if present", "When cached attributes are copied onto released FlowFiles, they replace any matching attributes.");
    public static final AllowableValue ATTRIBUTE_COPY_KEEP_ORIGINAL = new AllowableValue("keeporiginal", "Keep original", "Attributes on released FlowFiles are not overwritten by copied cached attributes.");
    public static final PropertyDescriptor ATTRIBUTE_COPY_MODE = new PropertyDescriptor.Builder().name("Attribute Copy Mode").description("Specifies how to handle attributes copied from FlowFiles entering the Notify processor").defaultValue(ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue()).required(true).allowableValues(new DescribedValue[]{ATTRIBUTE_COPY_REPLACE, ATTRIBUTE_COPY_KEEP_ORIGINAL}).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final AllowableValue WAIT_MODE_TRANSFER_TO_WAIT = new AllowableValue("wait", "Transfer to wait relationship", "Transfer a FlowFile to the 'wait' relationship when whose release signal has not been notified yet. This mode allows other incoming FlowFiles to be enqueued by moving FlowFiles into the wait relationship. It is recommended to set a prioritizer (for instance First In First Out) on the 'wait' relationship.");
    public static final AllowableValue WAIT_MODE_KEEP_IN_UPSTREAM = new AllowableValue("keep", "Keep in the upstream connection", "Transfer a FlowFile to the upstream connection where it comes from when whose release signal has not been notified yet. This mode helps keeping upstream connection being full so that the upstream source processor will not be scheduled while back-pressure is active and limit incoming FlowFiles. ");
    public static final PropertyDescriptor WAIT_MODE = new PropertyDescriptor.Builder().name("Wait Mode").description("Specifies how to handle a FlowFile waiting for a notify signal").defaultValue(WAIT_MODE_TRANSFER_TO_WAIT.getValue()).required(true).allowableValues(new DescribedValue[]{WAIT_MODE_TRANSFER_TO_WAIT, WAIT_MODE_KEEP_IN_UPSTREAM}).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor WAIT_PENALTY_DURATION = new PropertyDescriptor.Builder().name("Wait Penalty Duration").description("If configured, after a signal identifier got processed but did not meet the release criteria, the signal identifier is penalized and FlowFiles having the signal identifier will not be processed again for the specified period of time, so that the signal identifier will not block others to be processed. This can be useful for use cases where a Wait processor is expected to process multiple signal identifiers, and each signal identifier has multiple FlowFiles, and also the order of releasing FlowFiles is important within a signal identifier. The FlowFile order can be configured with Prioritizers. IMPORTANT: There is a limitation of number of queued signals can be processed, and Wait processor may not be able to check all queued signal ids. See additional details for the best practice.").required(false).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(RELEASE_SIGNAL_IDENTIFIER, TARGET_SIGNAL_COUNT, SIGNAL_COUNTER_NAME, WAIT_BUFFER_COUNT, RELEASABLE_FLOWFILE_COUNT, EXPIRATION_DURATION, DISTRIBUTED_CACHE_SERVICE, ATTRIBUTE_COPY_MODE, WAIT_MODE, WAIT_PENALTY_DURATION);
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile with a matching release signal in the cache will be routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship").build();
    public static final Relationship REL_WAIT = new Relationship.Builder().name("wait").description("A FlowFile with no matching release signal in the cache will be routed to this relationship").build();
    public static final Relationship REL_EXPIRED = new Relationship.Builder().name("expired").description("A FlowFile that has exceeded the configured Expiration Duration will be routed to this relationship").build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_WAIT, REL_EXPIRED, REL_FAILURE);
    private final Map<String, Long> signalIdPenalties = new HashMap<String, Long>();

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        PropertyValue waitPenaltyDuration;
        WaitNotifyProtocol.Signal signal;
        ComponentLog logger = this.getLogger();
        PropertyValue signalIdProperty = context.getProperty(RELEASE_SIGNAL_IDENTIFIER);
        Integer bufferCount = context.getProperty(WAIT_BUFFER_COUNT).asInteger();
        HashMap<Relationship, List> processedFlowFiles = new HashMap<Relationship, List>();
        Function<Relationship, List> getFlowFilesFor = r -> processedFlowFiles.computeIfAbsent((Relationship)r, k -> new ArrayList());
        AtomicReference targetSignalId = new AtomicReference();
        AtomicInteger bufferedCount = new AtomicInteger(0);
        ArrayList failedFilteringFlowFiles = new ArrayList();
        Supplier<FlowFileFilter.FlowFileFilterResult> acceptResultSupplier = () -> bufferedCount.incrementAndGet() == bufferCount.intValue() ? FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE : FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
        if (!this.signalIdPenalties.isEmpty()) {
            Iterator<Map.Entry<String, Long>> penaltyIterator = this.signalIdPenalties.entrySet().iterator();
            long now = System.currentTimeMillis();
            while (penaltyIterator.hasNext()) {
                Map.Entry<String, Long> penalty = penaltyIterator.next();
                if (penalty.getValue() >= now) continue;
                penaltyIterator.remove();
            }
        }
        List flowFiles = session.get(f -> {
            String fSignalId = signalIdProperty.evaluateAttributeExpressions(f).getValue();
            if (StringUtils.isBlank((CharSequence)fSignalId)) {
                logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[]{f});
                failedFilteringFlowFiles.add(f);
                return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
            }
            if (this.signalIdPenalties.containsKey(fSignalId)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("FlowFile {} with signalId='{}' is currently penalized until {}", new Object[]{f, fSignalId, this.signalIdPenalties.get(fSignalId)});
                }
                return FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
            }
            String targetSignalIdStr = (String)targetSignalId.get();
            if (targetSignalIdStr == null) {
                targetSignalId.set(fSignalId);
                if (logger.isDebugEnabled()) {
                    logger.debug("Selecting signalId='{}' for this trigger batch", new Object[]{fSignalId});
                }
                return (FlowFileFilter.FlowFileFilterResult)acceptResultSupplier.get();
            }
            if (targetSignalIdStr.equals(fSignalId)) {
                return (FlowFileFilter.FlowFileFilterResult)acceptResultSupplier.get();
            }
            if (logger.isDebugEnabled()) {
                logger.debug("FlowFile {} uses different signalId='{}' and is skipped for this trigger batch targeting '{}'", new Object[]{f, fSignalId, targetSignalIdStr});
            }
            return FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
        });
        String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
        boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
        AtomicReference<WaitNotifyProtocol.Signal> signalRef = new AtomicReference<WaitNotifyProtocol.Signal>();
        HashMap<String, Long> originalSignalCounts = new HashMap<String, Long>();
        Consumer<FlowFile> transferToFailure = flowFile -> {
            flowFile = session.penalize(flowFile);
            flowFile = this.clearWaitState(session, (FlowFile)flowFile);
            ((List)getFlowFilesFor.apply(REL_FAILURE)).add(flowFile);
        };
        Consumer<Map.Entry> transferFlowFiles = routedFlowFiles -> {
            Relationship relationship = (Relationship)routedFlowFiles.getKey();
            if (REL_WAIT.equals((Object)relationship)) {
                String waitMode = context.getProperty(WAIT_MODE).getValue();
                if (WAIT_MODE_KEEP_IN_UPSTREAM.getValue().equals(waitMode)) {
                    relationship = Relationship.SELF;
                }
            }
            Relationship finalRelationship = relationship;
            List flowFilesWithSignalAttributes = ((List)routedFlowFiles.getValue()).stream().map(f -> {
                if (REL_SUCCESS.equals((Object)finalRelationship) || REL_EXPIRED.equals((Object)finalRelationship)) {
                    f = this.clearWaitState(session, (FlowFile)f);
                }
                return this.copySignalAttributes(session, (FlowFile)f, (WaitNotifyProtocol.Signal)signalRef.get(), originalSignalCounts, replaceOriginalAttributes);
            }).collect(Collectors.toList());
            session.transfer(flowFilesWithSignalAttributes, relationship);
        };
        failedFilteringFlowFiles.forEach(f -> {
            flowFiles.remove(f);
            transferToFailure.accept((FlowFile)f);
        });
        if (flowFiles.isEmpty()) {
            processedFlowFiles.entrySet().forEach(transferFlowFiles);
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Processing {} FlowFile(s) for signalId='{}'", new Object[]{flowFiles.size(), targetSignalId.get()});
        }
        AtomicDistributedMapCacheClient cache = (AtomicDistributedMapCacheClient)context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
        WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
        String signalId = (String)targetSignalId.get();
        try {
            signal = protocol.getSignal(signalId);
            if (logger.isDebugEnabled()) {
                if (signal == null) {
                    logger.debug("No signal found in cache for signalId='{}'", new Object[]{signalId});
                } else {
                    logger.debug("Fetched signal for signalId='{}': counts={} attributesKeys={} releasableCount={}", new Object[]{signalId, signal.getCounts(), signal.getAttributes().keySet(), signal.getReleasableCount()});
                }
            }
            if (signal != null) {
                originalSignalCounts.putAll(signal.getCounts());
            }
            signalRef.set(signal);
        }
        catch (IOException e) {
            throw new ProcessException(String.format("Failed to get signal for %s due to %s", signalId, e), (Throwable)e);
        }
        String targetCounterName = null;
        long targetCount = 1L;
        int releasableFlowFileCount = 1;
        ArrayList<FlowFile> candidates = new ArrayList<FlowFile>();
        for (FlowFile flowFile2 : flowFiles) {
            long lWaitStartTimestamp;
            String waitStartTimestamp = flowFile2.getAttribute(WAIT_START_TIMESTAMP);
            if (waitStartTimestamp == null) {
                waitStartTimestamp = String.valueOf(System.currentTimeMillis());
                flowFile2 = session.putAttribute(flowFile2, WAIT_START_TIMESTAMP, waitStartTimestamp);
            }
            try {
                lWaitStartTimestamp = Long.parseLong(waitStartTimestamp);
            }
            catch (NumberFormatException nfe) {
                logger.error("{} has an invalid value '{}' on FlowFile {}", new Object[]{WAIT_START_TIMESTAMP, waitStartTimestamp, flowFile2});
                transferToFailure.accept(flowFile2);
                continue;
            }
            long expirationDuration = context.getProperty(EXPIRATION_DURATION).asTimePeriod(TimeUnit.MILLISECONDS);
            long now = System.currentTimeMillis();
            if (now > lWaitStartTimestamp + expirationDuration) {
                logger.info("FlowFile {} expired after {}ms", new Object[]{flowFile2, now - lWaitStartTimestamp});
                getFlowFilesFor.apply(REL_EXPIRED).add(flowFile2);
                continue;
            }
            if (signal == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("No release signal found for {} on FlowFile {} yet", new Object[]{signalId, flowFile2});
                }
                getFlowFilesFor.apply(REL_WAIT).add(flowFile2);
                continue;
            }
            if (candidates.isEmpty()) {
                targetCounterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile2).getValue();
                try {
                    targetCount = Long.parseLong(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile2).getValue());
                }
                catch (NumberFormatException e) {
                    transferToFailure.accept(flowFile2);
                    logger.error("Failed to parse targetCount when processing {} due to {}", new Object[]{flowFile2, e, e});
                    continue;
                }
                try {
                    releasableFlowFileCount = Integer.parseInt(context.getProperty(RELEASABLE_FLOWFILE_COUNT).evaluateAttributeExpressions(flowFile2).getValue());
                }
                catch (NumberFormatException e) {
                    transferToFailure.accept(flowFile2);
                    logger.error("Failed to parse releasableFlowFileCount when processing {} due to {}", new Object[]{flowFile2, e, e});
                    continue;
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("Using targetCounterName='{}' targetCount={} releasableFlowFileCount={} for signalId='{}'", new Object[]{targetCounterName, targetCount, releasableFlowFileCount, signalId});
                }
            }
            candidates.add(flowFile2);
        }
        boolean waitCompleted = false;
        boolean waitProgressed = false;
        if (signal != null && !candidates.isEmpty()) {
            if (releasableFlowFileCount > 0) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Evaluating releaseCandidates for signalId='{}' counts={} targetCounterName='{}' targetCount={} releasableFlowFileCount={} existingReleasableCount={} candidates={}", new Object[]{signalId, signal.getCounts(), targetCounterName, targetCount, releasableFlowFileCount, signal.getReleasableCount(), candidates.size()});
                }
                signal.releaseCandidates(targetCounterName, targetCount, releasableFlowFileCount, candidates, released -> ((List)getFlowFilesFor.apply(REL_SUCCESS)).addAll(released), waiting -> ((List)getFlowFilesFor.apply(REL_WAIT)).addAll(waiting));
                waitCompleted = signal.getTotalCount() == 0L && signal.getReleasableCount() == 0L;
                boolean bl = waitProgressed = !getFlowFilesFor.apply(REL_SUCCESS).isEmpty();
                if (logger.isDebugEnabled()) {
                    logger.debug("Release evaluation result for signalId='{}': released={} waiting={} remainingCounts={} remainingReleasable={}", new Object[]{signalId, getFlowFilesFor.apply(REL_SUCCESS).size(), getFlowFilesFor.apply(REL_WAIT).size(), signal.getCounts(), signal.getReleasableCount()});
                }
            } else {
                boolean reachedTargetCount;
                boolean bl = reachedTargetCount = StringUtils.isBlank(targetCounterName) ? signal.isTotalCountReached(targetCount) : signal.isCountReached(targetCounterName, targetCount);
                if (reachedTargetCount) {
                    getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates);
                } else {
                    getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("Zero-release-count mode for signalId='{}': reachedTargetCount={} candidates={} routedToSuccess={} routedToWait={}", new Object[]{signalId, reachedTargetCount, candidates.size(), reachedTargetCount ? candidates.size() : 0, reachedTargetCount ? 0 : candidates.size()});
                }
            }
        }
        processedFlowFiles.entrySet().forEach(transferFlowFiles);
        if (logger.isDebugEnabled() && !processedFlowFiles.isEmpty()) {
            processedFlowFiles.forEach((rel, list) -> logger.debug("Routing {} FlowFile(s) to relationship '{}' for signalId='{}'", new Object[]{list.size(), rel.getName(), signalId}));
        }
        if ((waitPenaltyDuration = context.getProperty(WAIT_PENALTY_DURATION)).isSet() && getFlowFilesFor.apply(REL_SUCCESS).isEmpty()) {
            long penaltyUntil = System.currentTimeMillis() + waitPenaltyDuration.asTimePeriod(TimeUnit.MILLISECONDS);
            this.signalIdPenalties.put(signalId, penaltyUntil);
            if (logger.isDebugEnabled()) {
                logger.debug("Penalizing signalId='{}' until {} because no FlowFile was released this trigger", new Object[]{signalId, penaltyUntil});
            }
        }
        try {
            if (waitCompleted) {
                protocol.complete(signalId);
                if (logger.isDebugEnabled()) {
                    logger.debug("Completed wait for signalId='{}' and removed signal from cache", new Object[]{signalId});
                }
            } else if (waitProgressed) {
                protocol.replace(signal);
                if (logger.isDebugEnabled()) {
                    logger.debug("Updated signal in cache for signalId='{}' with counts={} releasableCount={}", new Object[]{signalId, signal.getCounts(), signal.getReleasableCount()});
                }
            }
        }
        catch (IOException e) {
            session.rollback();
            throw new ProcessException(String.format("Unable to communicate with cache while updating %s due to %s", signalId, e), (Throwable)e);
        }
    }

    public void migrateProperties(PropertyConfiguration config) {
        config.renameProperty("distributed-cache-service", DISTRIBUTED_CACHE_SERVICE.getName());
        config.renameProperty("release-signal-id", RELEASE_SIGNAL_IDENTIFIER.getName());
        config.renameProperty("target-signal-count", TARGET_SIGNAL_COUNT.getName());
        config.renameProperty("signal-counter-name", SIGNAL_COUNTER_NAME.getName());
        config.renameProperty("wait-buffer-count", WAIT_BUFFER_COUNT.getName());
        config.renameProperty("releasable-flowfile-count", RELEASABLE_FLOWFILE_COUNT.getName());
        config.renameProperty("expiration-duration", EXPIRATION_DURATION.getName());
        config.renameProperty("attribute-copy-mode", ATTRIBUTE_COPY_MODE.getName());
        config.renameProperty("wait-mode", WAIT_MODE.getName());
        config.renameProperty("wait-penalty-duration", WAIT_PENALTY_DURATION.getName());
    }

    private FlowFile clearWaitState(ProcessSession session, FlowFile flowFile) {
        return session.removeAttribute(flowFile, WAIT_START_TIMESTAMP);
    }

    private FlowFile copySignalAttributes(ProcessSession session, FlowFile flowFile, WaitNotifyProtocol.Signal signal, Map<String, Long> originalCount, boolean replaceOriginal) {
        Map<String, String> attributesToCopy;
        if (signal == null) {
            return flowFile;
        }
        if (replaceOriginal) {
            attributesToCopy = new HashMap<String, String>(signal.getAttributes());
            attributesToCopy.remove("uuid");
        } else {
            attributesToCopy = signal.getAttributes().entrySet().stream().filter(e -> flowFile.getAttribute((String)e.getKey()) == null).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        }
        long totalCount = originalCount.entrySet().stream().mapToLong(e -> {
            Long count = (Long)e.getValue();
            attributesToCopy.put("wait.counter." + (String)e.getKey(), String.valueOf(count));
            return count;
        }).sum();
        attributesToCopy.put("wait.counter.total", String.valueOf(totalCount));
        return session.putAllAttributes(flowFile, attributesToCopy);
    }

    @OnStopped
    public void onStopped(ProcessContext context) {
        this.signalIdPenalties.clear();
    }

    Map<String, Long> getSignalIdPenalties() {
        return this.signalIdPenalties;
    }
}

