/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.LongSupplier;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.documentation.UseCases;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.timebuffer.EntityAccess;
import org.apache.nifi.util.timebuffer.LongEntityAccess;
import org.apache.nifi.util.timebuffer.TimedBuffer;
import org.apache.nifi.util.timebuffer.TimestampedLong;

@SideEffectFree
@TriggerSerially
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"rate control", "throttle", "rate", "throughput"})
@CapabilityDescription(value="Controls the rate at which data is transferred to follow-on processors. If you configure a very small Time Duration, then the accuracy of the throttle gets worse. You can improve this accuracy by decreasing the Yield Duration, at the expense of more Tasks given to the processor.")
@UseCases(value={@UseCase(description="Limit the rate at which data is sent to a downstream system with little to no bursts", keywords={"throttle", "limit", "slow down", "data rate"}, configuration="Set the \"Rate Control Criteria\" to `data rate`.\nSet the \"Time Duration\" property to `1 sec`.\nConfigure the \"Maximum Rate\" property to specify how much data should be allowed through each second.\n\nFor example, to allow through 8 MB per second, set \"Maximum Rate\" to `8 MB`.\n"), @UseCase(description="Limit the rate at which FlowFiles are sent to a downstream system with little to no bursts", keywords={"throttle", "limit", "slow down", "flowfile rate"}, configuration="Set the \"Rate Control Criteria\" to `flowfile count`.\nSet the \"Time Duration\" property to `1 sec`.\nConfigure the \"Maximum Rate\" property to specify how many FlowFiles should be allowed through each second.\n\nFor example, to allow through 100 FlowFiles per second, set \"Maximum Rate\" to `100`.\n"), @UseCase(description="Reject requests that exceed a specific rate with little to no bursts", keywords={"throttle", "limit", "slow down", "request rate"}, configuration="Set the \"Rate Control Criteria\" to `flowfile count`.\nSet the \"Time Duration\" property to `1 sec`.\nSet the \"Rate Exceeded Strategy\" property to `Route to 'rate exceeded'`.\nConfigure the \"Maximum Rate\" property to specify how many requests should be allowed through each second.\n\nFor example, to allow through 100 requests per second, set \"Maximum Rate\" to `100`.\nIf more than 100 requests come in during any one second, the additional requests will be routed to `rate exceeded` instead of `success`.\n"), @UseCase(description="Reject requests that exceed a specific rate, allowing for bursts", keywords={"throttle", "limit", "slow down", "request rate"}, configuration="Set the \"Rate Control Criteria\" to `flowfile count`.\nSet the \"Time Duration\" property to `1 min`.\nSet the \"Rate Exceeded Strategy\" property to `Route to 'rate exceeded'`.\nConfigure the \"Maximum Rate\" property to specify how many requests should be allowed through each minute.\n\nFor example, to allow through 100 requests per second, set \"Maximum Rate\" to `6000`.\nThis will allow through 6,000 FlowFiles per minute, which averages to 100 FlowFiles per second. However, those 6,000 FlowFiles may come all within the first couple of\nseconds, or they may come in over a period of 60 seconds. As a result, this gives us an average rate of 100 FlowFiles per second but allows for bursts of data.\nIf more than 6,000 requests come in during any one minute, the additional requests will be routed to `rate exceeded` instead of `success`.\n")})
public class ControlRate
extends AbstractProcessor {
    static final AllowableValue HOLD_FLOWFILE = new AllowableValue("Hold FlowFile", "Hold FlowFile", "The FlowFile will be held in its input queue until the rate of data has fallen below the configured maximum and will then be allowed through.");
    static final AllowableValue ROUTE_TO_RATE_EXCEEDED = new AllowableValue("Route to 'rate exceeded'", "Route to 'rate exceeded'", "The FlowFile will be routed to the 'rate exceeded' Relationship.");
    public static final int MAX_FLOW_FILES_PER_BATCH = 1000;
    private static final long DEFAULT_ACCRUAL_COUNT = -1L;
    public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder().name("Rate Control Criteria").description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.").required(true).allowableValues(RateControlCriteria.class).defaultValue((DescribedValue)RateControlCriteria.DATA_RATE).build();
    public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder().name("Maximum Rate").description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dependsOn(RATE_CONTROL_CRITERIA, (DescribedValue)RateControlCriteria.DATA_RATE, new DescribedValue[]{RateControlCriteria.FLOWFILE_RATE, RateControlCriteria.ATTRIBUTE_RATE}).build();
    public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder().name("Maximum Data Rate").description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a Data Size (such as '1 MB') representing bytes per Time Duration.").required(false).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).dependsOn(RATE_CONTROL_CRITERIA, (DescribedValue)RateControlCriteria.DATA_OR_FLOWFILE_RATE, new DescribedValue[0]).build();
    public static final PropertyDescriptor MAX_COUNT_RATE = new PropertyDescriptor.Builder().name("Maximum FlowFile Rate").description("The maximum rate at which FlowFiles should pass through this processor. The format of this property is expected to be a positive integer representing FlowFiles count per Time Duration").required(false).addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR).dependsOn(RATE_CONTROL_CRITERIA, (DescribedValue)RateControlCriteria.DATA_OR_FLOWFILE_RATE, new DescribedValue[0]).build();
    public static final PropertyDescriptor RATE_EXCEEDED_STRATEGY = new PropertyDescriptor.Builder().name("Rate Exceeded Strategy").description("Specifies how to handle an incoming FlowFile when the maximum data rate has been exceeded.").required(true).allowableValues(new DescribedValue[]{HOLD_FLOWFILE, ROUTE_TO_RATE_EXCEEDED}).defaultValue(HOLD_FLOWFILE.getValue()).build();
    public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder().name("Rate Controlled Attribute").description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. The value of the attribute referenced by this property must be a positive long, or the FlowFile will be routed to failure. This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).dependsOn(RATE_CONTROL_CRITERIA, (DescribedValue)RateControlCriteria.ATTRIBUTE_RATE, new DescribedValue[0]).build();
    public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder().name("Time Duration").description("The amount of time to which the Maximum Rate pertains. Changing this value resets the rate counters.").required(true).addValidator(StandardValidators.createTimePeriodValidator((long)1L, (TimeUnit)TimeUnit.SECONDS, (long)Integer.MAX_VALUE, (TimeUnit)TimeUnit.SECONDS)).defaultValue("1 min").build();
    public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder().name("Grouping Attribute").description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for each value specified by the attribute with this name. Changing this value resets the rate counters.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(RATE_CONTROL_CRITERIA, TIME_PERIOD, MAX_RATE, MAX_DATA_RATE, MAX_COUNT_RATE, RATE_EXCEEDED_STRATEGY, RATE_CONTROL_ATTRIBUTE_NAME, GROUPING_ATTRIBUTE_NAME);
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles are transferred to this relationship under normal conditions").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles will be routed to this relationship if they are missing a necessary Rate Controlled Attribute or the attribute is not in the expected format").build();
    static final Relationship REL_RATE_EXCEEDED = new Relationship.Builder().name("rate exceeded").description("A FlowFile will be routed to this Relationship if it results in exceeding the maximum threshold allowed based on the Processor's configuration and if the Rate Exceeded Strategy is configured to use this Relationship.").build();
    private static final Set<Relationship> DEFAULT_RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
    private static final Set<Relationship> RATE_EXCEEDED_RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE, REL_RATE_EXCEEDED);
    private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*");
    private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
    private volatile Set<Relationship> relationships = DEFAULT_RELATIONSHIPS;
    private final ConcurrentMap<String, Throttle> dataThrottleMap = new ConcurrentHashMap<String, Throttle>();
    private final ConcurrentMap<String, Throttle> countThrottleMap = new ConcurrentHashMap<String, Throttle>();
    private final AtomicLong lastThrottleClearTime = new AtomicLong(this.getCurrentTimeMillis());
    private volatile RateControlCriteria rateControlCriteria = null;
    private volatile String rateControlAttribute = null;
    private volatile String maximumRateStr = null;
    private volatile String maximumCountRateStr = null;
    private volatile String groupingAttributeName = null;
    private volatile int timePeriodSeconds = 1;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext context) {
        ArrayList<ValidationResult> validationResults = new ArrayList<ValidationResult>(super.customValidate(context));
        switch (((RateControlCriteria)context.getProperty(RATE_CONTROL_CRITERIA).asAllowableValue(RateControlCriteria.class)).ordinal()) {
            case 3: {
                validationResults.add(StandardValidators.DATA_SIZE_VALIDATOR.validate(MAX_DATA_RATE.getDisplayName(), context.getProperty(MAX_DATA_RATE).getValue(), context));
                validationResults.add(StandardValidators.POSITIVE_LONG_VALIDATOR.validate(MAX_COUNT_RATE.getDisplayName(), context.getProperty(MAX_COUNT_RATE).getValue(), context));
                break;
            }
            case 0: {
                validationResults.add(StandardValidators.DATA_SIZE_VALIDATOR.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context));
                break;
            }
            case 2: {
                String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
                if (rateAttr == null) {
                    validationResults.add(new ValidationResult.Builder().subject(RATE_CONTROL_ATTRIBUTE_NAME.getName()).explanation("property must be set if using <Rate Control Criteria> of 'attribute value'").build());
                }
            }
            case 1: {
                validationResults.add(StandardValidators.POSITIVE_LONG_VALIDATOR.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context));
            }
        }
        return validationResults;
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        block9: {
            block10: {
                block8: {
                    super.onPropertyModified(descriptor, oldValue, newValue);
                    if (descriptor.equals((Object)RATE_EXCEEDED_STRATEGY)) {
                        this.relationships = ROUTE_TO_RATE_EXCEEDED.getValue().equalsIgnoreCase(newValue) ? RATE_EXCEEDED_RELATIONSHIPS : DEFAULT_RELATIONSHIPS;
                    }
                    if (!descriptor.equals((Object)RATE_CONTROL_CRITERIA) && !descriptor.equals((Object)RATE_CONTROL_ATTRIBUTE_NAME) && !descriptor.equals((Object)GROUPING_ATTRIBUTE_NAME) && !descriptor.equals((Object)TIME_PERIOD)) break block8;
                    this.dataThrottleMap.clear();
                    this.countThrottleMap.clear();
                    break block9;
                }
                if (!descriptor.equals((Object)MAX_RATE) && !descriptor.equals((Object)MAX_DATA_RATE)) break block10;
                if (newValue == null) break block9;
                long newRate = DataUnit.DATA_SIZE_PATTERN.matcher(newValue.toUpperCase()).matches() ? DataUnit.parseDataSize((String)newValue, (DataUnit)DataUnit.B).longValue() : Long.parseLong(newValue);
                if (this.dataThrottleRequired()) {
                    for (Throttle throttle : this.dataThrottleMap.values()) {
                        throttle.setMaxRate(newRate);
                    }
                }
                if (!this.countThrottleRequired()) break block9;
                for (Throttle throttle : this.countThrottleMap.values()) {
                    throttle.setMaxRate(newRate);
                }
                break block9;
            }
            if (descriptor.equals((Object)MAX_COUNT_RATE)) {
                long newRate;
                try {
                    newRate = Long.parseLong(newValue);
                }
                catch (NumberFormatException nfe) {
                    newRate = -1L;
                }
                for (Throttle throttle : this.countThrottleMap.values()) {
                    throttle.setMaxRate(newRate);
                }
            }
        }
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        this.rateControlCriteria = (RateControlCriteria)context.getProperty(RATE_CONTROL_CRITERIA).asAllowableValue(RateControlCriteria.class);
        String string = this.rateControlAttribute = this.rateControlCriteria == RateControlCriteria.ATTRIBUTE_RATE ? context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue() : null;
        if (this.dataThrottleRequired()) {
            String string2 = this.maximumRateStr = this.rateControlCriteria == RateControlCriteria.DATA_OR_FLOWFILE_RATE ? context.getProperty(MAX_DATA_RATE).getValue().toUpperCase() : context.getProperty(MAX_RATE).getValue().toUpperCase();
        }
        if (this.countThrottleRequired()) {
            this.maximumCountRateStr = this.rateControlCriteria == RateControlCriteria.DATA_OR_FLOWFILE_RATE ? context.getProperty(MAX_COUNT_RATE).getValue() : context.getProperty(MAX_RATE).getValue();
        }
        this.groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
        this.timePeriodSeconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS).intValue();
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        String strategy = context.getProperty(RATE_EXCEEDED_STRATEGY).getValue();
        if (ROUTE_TO_RATE_EXCEEDED.getValue().equalsIgnoreCase(strategy)) {
            this.routeFlowFilesExceedingRate(context, session);
        } else {
            this.holdFlowFilesExceedingRate(context, session);
        }
    }

    private void routeFlowFilesExceedingRate(ProcessContext context, ProcessSession session) {
        this.clearExpiredThrottles(context);
        List flowFiles = session.get(1000);
        if (flowFiles.isEmpty()) {
            context.yield();
            return;
        }
        ThrottleFilter filter = new ThrottleFilter(1000, this::getCurrentTimeMillis);
        for (FlowFile flowFile : flowFiles) {
            FlowFileFilter.FlowFileFilterResult result;
            Relationship relationship = !this.isRateAttributeValid(flowFile) ? REL_FAILURE : ((result = filter.filter(flowFile)).isAccept() ? REL_SUCCESS : REL_RATE_EXCEEDED);
            session.transfer(flowFile, relationship);
            this.getLogger().info("Routing {} to {}", new Object[]{flowFile, relationship.getName()});
            session.getProvenanceReporter().route(flowFile, relationship);
        }
    }

    private void holdFlowFilesExceedingRate(ProcessContext context, ProcessSession session) {
        this.clearExpiredThrottles(context);
        List flowFiles = session.get((FlowFileFilter)new ThrottleFilter(1000, this::getCurrentTimeMillis));
        if (flowFiles.isEmpty()) {
            context.yield();
            return;
        }
        ComponentLog logger = this.getLogger();
        for (FlowFile flowFile : flowFiles) {
            if (this.isRateAttributeValid(flowFile)) {
                logger.info("transferring {} to 'success'", new Object[]{flowFile});
                session.transfer(flowFile, REL_SUCCESS);
                continue;
            }
            logger.error("Routing {} to 'failure' due to missing or invalid attribute", new Object[]{flowFile});
            session.transfer(flowFile, REL_FAILURE);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearExpiredThrottles(ProcessContext context) {
        long throttleExpirationMillis;
        long lastClearTime = this.lastThrottleClearTime.get();
        if (lastClearTime < (throttleExpirationMillis = this.getCurrentTimeMillis() - 2L * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS)) && this.lastThrottleClearTime.compareAndSet(lastClearTime, this.getCurrentTimeMillis())) {
            HashSet throttleSet = new HashSet();
            if (this.dataThrottleRequired()) {
                throttleSet.addAll(this.dataThrottleMap.entrySet());
            }
            if (this.countThrottleRequired()) {
                throttleSet.addAll(this.countThrottleMap.entrySet());
            }
            Iterator itr = throttleSet.iterator();
            while (itr.hasNext()) {
                Map.Entry entry = (Map.Entry)itr.next();
                Throttle throttle = (Throttle)entry.getValue();
                if (!throttle.tryLock()) continue;
                try {
                    if (throttle.lastUpdateTime() >= lastClearTime) continue;
                    itr.remove();
                }
                finally {
                    throttle.unlock();
                }
            }
        }
    }

    protected long getCurrentTimeMillis() {
        return System.currentTimeMillis();
    }

    private boolean isRateAttributeValid(FlowFile flowFile) {
        if (this.rateControlCriteria == RateControlCriteria.ATTRIBUTE_RATE) {
            String attributeValue = flowFile.getAttribute(this.rateControlAttribute);
            return attributeValue != null && POSITIVE_LONG_PATTERN.matcher(attributeValue).matches();
        }
        return true;
    }

    private long getDataSizeAccrual(FlowFile flowFile) {
        return flowFile.getSize();
    }

    private long getCountAccrual(FlowFile flowFile) {
        return switch (this.rateControlCriteria.ordinal()) {
            default -> throw new MatchException(null, null);
            case 0 -> -1L;
            case 1, 3 -> 1L;
            case 2 -> {
                String attributeValue = flowFile.getAttribute(this.rateControlAttribute);
                if (attributeValue == null) {
                    yield -1L;
                }
                if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
                    yield -1L;
                }
                yield Long.parseLong(attributeValue);
            }
        };
    }

    private boolean dataThrottleRequired() {
        boolean bl;
        block4: {
            block3: {
                if (this.rateControlCriteria == null) break block3;
                switch (this.rateControlCriteria.ordinal()) {
                    case 0: 
                    case 3: {
                        break;
                    }
                    default: {
                        break block3;
                    }
                }
                bl = true;
                break block4;
            }
            bl = false;
        }
        return bl;
    }

    private boolean countThrottleRequired() {
        boolean bl;
        block4: {
            block3: {
                if (this.rateControlCriteria == null) break block3;
                switch (this.rateControlCriteria.ordinal()) {
                    case 1: 
                    case 2: 
                    case 3: {
                        break;
                    }
                    default: {
                        break block3;
                    }
                }
                bl = true;
                break block4;
            }
            bl = false;
        }
        return bl;
    }

    static enum RateControlCriteria implements DescribedValue
    {
        DATA_RATE("data rate", "Rate is controlled by counting bytes transferred per time duration."),
        FLOWFILE_RATE("flowfile count", "Rate is controlled by counting FlowFiles transferred per time duration"),
        ATTRIBUTE_RATE("attribute value", "Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration"),
        DATA_OR_FLOWFILE_RATE("data rate or flowfile count", "Rate is controlled by counting bytes and FlowFiles transferred per time duration; if either threshold is met, throttling is enforced");

        private final String value;
        private final String description;

        private RateControlCriteria(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String getValue() {
            return this.value;
        }

        public String getDisplayName() {
            return this.value;
        }

        public String getDescription() {
            return this.description;
        }
    }

    private static class Throttle
    extends ReentrantLock {
        private final AtomicLong maxRate = new AtomicLong(1L);
        private final long timePeriodMillis;
        private final TimedBuffer<TimestampedLong> timedBuffer;
        private final ComponentLog logger;
        private final LongSupplier currentTimeSupplier;
        private volatile long penalizationPeriod = 0L;
        private volatile long penalizationExpired = 0L;
        private volatile long lastUpdateTime;

        private Throttle(int timePeriod, TimeUnit unit, ComponentLog logger, LongSupplier currentTimeSupplier) {
            this.timePeriodMillis = TimeUnit.MILLISECONDS.convert(timePeriod, unit);
            this.timedBuffer = new TimedBuffer(unit, timePeriod, (EntityAccess)new LongEntityAccess(), currentTimeSupplier);
            this.logger = logger;
            this.currentTimeSupplier = currentTimeSupplier;
        }

        public void setMaxRate(long maxRate) {
            this.maxRate.set(maxRate);
        }

        public long lastUpdateTime() {
            return this.lastUpdateTime;
        }

        public boolean tryAdd(long value) {
            long transferred;
            if (value < 0L) {
                return false;
            }
            long now = this.currentTimeSupplier.getAsLong();
            if (this.penalizationExpired > now) {
                return false;
            }
            long maxRateValue = this.maxRate.get();
            TimestampedLong sum = (TimestampedLong)this.timedBuffer.getAggregateValue(this.timePeriodMillis);
            if (sum != null && sum.getValue() >= maxRateValue) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("current sum for throttle is {} at time {}, so not allowing rate of {} through", new Object[]{sum.getValue(), sum.getTimestamp(), value});
                }
                return false;
            }
            if (this.penalizationPeriod > 0L) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Starting Throttle penalization, expiring {} milliseconds from now", new Object[]{this.penalizationPeriod});
                }
                this.penalizationExpired = now + this.penalizationPeriod;
                this.penalizationPeriod = 0L;
                return false;
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("current sum for throttle is {} at time {}, so allowing rate of {} through", new Object[]{sum == null ? 0L : sum.getValue(), sum == null ? 0L : sum.getTimestamp(), value});
            }
            if ((transferred = ((TimestampedLong)this.timedBuffer.add((Object)new TimestampedLong(Long.valueOf(value)))).getValue().longValue()) > maxRateValue) {
                long amountOver = transferred - maxRateValue;
                double pct = (double)amountOver / (double)maxRateValue;
                this.penalizationPeriod = (long)((double)this.timePeriodMillis * pct);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, this.penalizationPeriod});
                }
            }
            this.lastUpdateTime = now;
            return true;
        }
    }

    private class ThrottleFilter
    implements FlowFileFilter {
        private final int flowFilesPerBatch;
        private final LongSupplier currentTimeSupplier;
        private int flowFilesInBatch = 0;

        ThrottleFilter(int maxFFPerBatch, LongSupplier currentTimeSupplier) {
            this.flowFilesPerBatch = maxFFPerBatch;
            this.currentTimeSupplier = currentTimeSupplier;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public FlowFileFilter.FlowFileFilterResult filter(FlowFile flowFile) {
            String groupName;
            if (!ControlRate.this.isRateAttributeValid(flowFile)) {
                return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
            }
            String string = groupName = ControlRate.this.groupingAttributeName == null ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(ControlRate.this.groupingAttributeName);
            if (groupName == null) {
                groupName = DEFAULT_GROUP_ATTRIBUTE;
            }
            Throttle dataThrottle = (Throttle)ControlRate.this.dataThrottleMap.get(groupName);
            Throttle countThrottle = (Throttle)ControlRate.this.countThrottleMap.get(groupName);
            boolean dataThrottlingActive = false;
            if (ControlRate.this.dataThrottleRequired()) {
                if (dataThrottle == null) {
                    dataThrottle = new Throttle(ControlRate.this.timePeriodSeconds, TimeUnit.SECONDS, ControlRate.this.getLogger(), this.currentTimeSupplier);
                    dataThrottle.setMaxRate(DataUnit.parseDataSize((String)ControlRate.this.maximumRateStr, (DataUnit)DataUnit.B).longValue());
                    ControlRate.this.dataThrottleMap.put(groupName, dataThrottle);
                }
                dataThrottle.lock();
                try {
                    if (dataThrottle.tryAdd(ControlRate.this.getDataSizeAccrual(flowFile))) {
                        ++this.flowFilesInBatch;
                        if (this.flowFilesInBatch >= this.flowFilesPerBatch) {
                            this.flowFilesInBatch = 0;
                            FlowFileFilter.FlowFileFilterResult flowFileFilterResult = FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
                            return flowFileFilterResult;
                        }
                        if (!ControlRate.this.countThrottleRequired()) {
                            FlowFileFilter.FlowFileFilterResult flowFileFilterResult = FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
                            return flowFileFilterResult;
                        }
                    } else {
                        dataThrottlingActive = true;
                    }
                }
                finally {
                    dataThrottle.unlock();
                }
            }
            if (ControlRate.this.countThrottleRequired() && !dataThrottlingActive) {
                if (countThrottle == null) {
                    countThrottle = new Throttle(ControlRate.this.timePeriodSeconds, TimeUnit.SECONDS, ControlRate.this.getLogger(), this.currentTimeSupplier);
                    countThrottle.setMaxRate(Long.parseLong(ControlRate.this.maximumCountRateStr));
                    ControlRate.this.countThrottleMap.put(groupName, countThrottle);
                }
                countThrottle.lock();
                try {
                    if (countThrottle.tryAdd(ControlRate.this.getCountAccrual(flowFile))) {
                        ++this.flowFilesInBatch;
                        if (this.flowFilesInBatch >= this.flowFilesPerBatch) {
                            this.flowFilesInBatch = 0;
                            FlowFileFilter.FlowFileFilterResult flowFileFilterResult = FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
                            return flowFileFilterResult;
                        }
                        FlowFileFilter.FlowFileFilterResult flowFileFilterResult = FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
                        return flowFileFilterResult;
                    }
                }
                finally {
                    countThrottle.unlock();
                }
            }
            if (ControlRate.this.groupingAttributeName == null) {
                return FlowFileFilter.FlowFileFilterResult.REJECT_AND_TERMINATE;
            }
            return FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
        }
    }
}

