/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.driver.internal.core.util.concurrent;

import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
import com.datastax.oss.driver.shaded.guava.common.base.Preconditions;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import com.datastax.oss.driver.shaded.netty.util.concurrent.EventExecutor;
import com.datastax.oss.driver.shaded.netty.util.concurrent.ScheduledFuture;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import net.jcip.annotations.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class Debouncer<IncomingT, CoalescedT> {
    private static final Logger LOG = LoggerFactory.getLogger(Debouncer.class);
    private final String logPrefix;
    private final EventExecutor adminExecutor;
    private final Consumer<CoalescedT> onFlush;
    private final Duration window;
    private final long maxEvents;
    private final Function<List<IncomingT>, CoalescedT> coalescer;
    private List<IncomingT> currentBatch = new ArrayList<IncomingT>();
    private ScheduledFuture<?> nextFlush;
    private boolean stopped;

    public Debouncer(EventExecutor adminExecutor, Function<List<IncomingT>, CoalescedT> coalescer, Consumer<CoalescedT> onFlush, Duration window, long maxEvents) {
        this("debouncer", adminExecutor, coalescer, onFlush, window, maxEvents);
    }

    public Debouncer(String logPrefix, EventExecutor adminExecutor, Function<List<IncomingT>, CoalescedT> coalescer, Consumer<CoalescedT> onFlush, Duration window, long maxEvents) {
        this.logPrefix = logPrefix;
        this.coalescer = coalescer;
        Preconditions.checkArgument((maxEvents >= 1L ? 1 : 0) != 0, (Object)"maxEvents should be at least 1");
        this.adminExecutor = adminExecutor;
        this.onFlush = onFlush;
        this.window = window;
        this.maxEvents = maxEvents;
    }

    public void receive(IncomingT element) {
        assert (this.adminExecutor.inEventLoop());
        if (this.stopped) {
            return;
        }
        if (this.window.isZero() || this.maxEvents == 1L) {
            LOG.debug("[{}] Received {}, flushing immediately (window = {}, maxEvents = {})", new Object[]{this.logPrefix, element, this.window, this.maxEvents});
            this.onFlush.accept(this.coalescer.apply((List<IncomingT>)ImmutableList.of(element)));
        } else {
            this.currentBatch.add(element);
            if ((long)this.currentBatch.size() == this.maxEvents) {
                LOG.debug("[{}] Received {}, flushing immediately (because {} accumulated events)", new Object[]{this.logPrefix, element, this.maxEvents});
                this.flushNow();
            } else {
                LOG.debug("[{}] Received {}, scheduling next flush in {}", new Object[]{this.logPrefix, element, this.window});
                this.scheduleFlush();
            }
        }
    }

    public void flushNow() {
        assert (this.adminExecutor.inEventLoop());
        LOG.debug("[{}] Flushing now", (Object)this.logPrefix);
        this.cancelNextFlush();
        if (!this.currentBatch.isEmpty()) {
            this.onFlush.accept(this.coalescer.apply(this.currentBatch));
            this.currentBatch = new ArrayList<IncomingT>();
        }
    }

    private void scheduleFlush() {
        assert (this.adminExecutor.inEventLoop());
        this.cancelNextFlush();
        this.nextFlush = this.adminExecutor.schedule(this::flushNow, this.window.toNanos(), TimeUnit.NANOSECONDS);
        this.nextFlush.addListener(UncaughtExceptions::log);
    }

    private void cancelNextFlush() {
        boolean cancelled;
        assert (this.adminExecutor.inEventLoop());
        if (this.nextFlush != null && !this.nextFlush.isDone() && (cancelled = this.nextFlush.cancel(true))) {
            LOG.debug("[{}] Cancelled existing scheduled flush", (Object)this.logPrefix);
        }
    }

    public void stop() {
        assert (this.adminExecutor.inEventLoop());
        if (!this.stopped) {
            this.stopped = true;
            this.cancelNextFlush();
        }
    }
}

