/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.datastore;

import java.io.IOException;
import java.io.Serializable;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.MovingFunction;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RampupThrottlingFn<T>
extends DoFn<T, T>
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(RampupThrottlingFn.class);
    private static final double BASE_BUDGET = 500.0;
    private static final Duration RAMP_UP_INTERVAL = Duration.standardMinutes((long)5L);
    private static final FluentBackoff fluentBackoff = FluentBackoff.DEFAULT;
    private final int numWorkers;
    @VisibleForTesting
    Counter throttlingMsecs = Metrics.counter(RampupThrottlingFn.class, (String)"throttling-msecs");
    private transient MovingFunction successfulOps;
    private Instant firstInstant;
    @VisibleForTesting
    transient Sleeper sleeper;

    public RampupThrottlingFn(int numWorkers) {
        this.numWorkers = numWorkers;
        this.sleeper = Sleeper.DEFAULT;
        this.successfulOps = new MovingFunction(Duration.standardSeconds((long)1L).getMillis(), Duration.standardSeconds((long)1L).getMillis(), 1, 1, Sum.ofLongs());
        this.firstInstant = Instant.now();
    }

    private int calcMaxOpsBudget(Instant first, Instant instant) {
        double rampUpIntervalMinutes = RAMP_UP_INTERVAL.getStandardMinutes();
        Duration durationSinceFirst = new Duration((ReadableInstant)first, (ReadableInstant)instant);
        double calculatedGrowth = ((double)durationSinceFirst.getStandardMinutes() - rampUpIntervalMinutes) / rampUpIntervalMinutes;
        double growth = Math.max(0.0, calculatedGrowth);
        double maxOpsBudget = 500.0 / (double)this.numWorkers * Math.pow(1.5, growth);
        return (int)Math.min(2.147483647E9, Math.max(1.0, maxOpsBudget));
    }

    @DoFn.Setup
    public void setup() {
        this.sleeper = Sleeper.DEFAULT;
        this.successfulOps = new MovingFunction(Duration.standardSeconds((long)1L).getMillis(), Duration.standardSeconds((long)1L).getMillis(), 1, 1, Sum.ofLongs());
    }

    @DoFn.ProcessElement
    public void processElement(DoFn.ProcessContext c) throws IOException, InterruptedException {
        Instant nonNullableFirstInstant = this.firstInstant;
        Object element = c.element();
        BackOff backoff = fluentBackoff.backoff();
        while (true) {
            Instant instant = Instant.now();
            int maxOpsBudget = this.calcMaxOpsBudget(nonNullableFirstInstant, instant);
            long currentOpCount = this.successfulOps.get(instant.getMillis());
            long availableOps = (long)maxOpsBudget - currentOpCount;
            if (maxOpsBudget >= Integer.MAX_VALUE || availableOps > 0L) {
                c.output(element);
                this.successfulOps.add(instant.getMillis(), 1L);
                return;
            }
            long backoffMillis = backoff.nextBackOffMillis();
            LOG.info("Delaying by {}ms to conform to gradual ramp-up.", (Object)backoffMillis);
            this.throttlingMsecs.inc(backoffMillis);
            this.sleeper.sleep(backoffMillis);
        }
    }

    public void populateDisplayData(DisplayData.Builder builder) {
        builder.add(DisplayData.item((String)"hintNumWorkers", (Integer)this.numWorkers).withLabel("Number of workers for ramp-up throttling algorithm"));
    }
}

