/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.aggregation;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscription;
import reactor.core.Dispatcher;
import reactor.core.processor.InsufficientCapacityException;
import reactor.fn.Consumer;
import reactor.fn.Pausable;
import reactor.fn.timer.Timer;
import reactor.rx.action.Action;
import reactor.rx.subscription.BatchSubscription;
import reactor.rx.subscription.PushSubscription;

public final class BufferShiftAction<T>
extends Action<T, List<T>> {
    private final List<List<T>> buckets = new LinkedList<List<T>>();
    private final Consumer<Long> timeshiftTask;
    private final long timeshift;
    private final TimeUnit unit;
    private final Timer timer;
    private final int skip;
    private final int batchSize;
    private Pausable timeshiftRegistration;
    private int index;

    public BufferShiftAction(Dispatcher dispatcher, int size, int skip) {
        this(dispatcher, size, skip, -1L, -1L, null, null);
    }

    public BufferShiftAction(final Dispatcher dispatcher, int size, int skip, long timeshift, final long timespan, TimeUnit unit, final Timer timer) {
        super(size);
        this.skip = skip;
        this.batchSize = size;
        if (timespan > 0L && timeshift > 0L) {
            final TimeUnit targetUnit = unit != null ? unit : TimeUnit.SECONDS;
            final Consumer flushTimerTask = new Consumer<List<T>>(){

                @Override
                public void accept(List<T> bucket) {
                    Iterator it = BufferShiftAction.this.buckets.iterator();
                    while (it.hasNext()) {
                        List itBucket = (List)it.next();
                        if (bucket != itBucket) continue;
                        it.remove();
                        BufferShiftAction.this.broadcastNext(bucket);
                        break;
                    }
                }
            };
            this.timeshiftTask = new Consumer<Long>(){

                @Override
                public void accept(Long aLong) {
                    try {
                        if (!BufferShiftAction.this.isPublishing()) {
                            return;
                        }
                        dispatcher.tryDispatch(null, new Consumer<Void>(){

                            @Override
                            public void accept(Void aVoid) {
                                final ArrayList bucket = new ArrayList();
                                BufferShiftAction.this.buckets.add(bucket);
                                timer.submit(new Consumer<Long>(){

                                    @Override
                                    public void accept(Long aLong) {
                                        dispatcher.dispatch(bucket, flushTimerTask, null);
                                    }
                                }, timespan, targetUnit);
                            }
                        }, null);
                    }
                    catch (InsufficientCapacityException insufficientCapacityException) {
                        // empty catch block
                    }
                }
            };
            this.timeshift = timeshift;
            this.unit = targetUnit;
            this.timer = timer;
        } else {
            this.timeshift = -1L;
            this.unit = null;
            this.timer = null;
            this.timeshiftTask = null;
        }
    }

    @Override
    protected void doOnSubscribe(Subscription subscription) {
        if (this.timer != null) {
            this.timeshiftRegistration = this.timer.schedule(this.timeshiftTask, this.timeshift, this.unit);
        }
    }

    @Override
    protected PushSubscription<T> createTrackingSubscription(Subscription subscription) {
        return new BatchSubscription(subscription, this, this.skip + this.batchSize);
    }

    @Override
    protected void doNext(T value) {
        if (this.timer == null && this.index++ % this.skip == 0) {
            this.buckets.add(this.batchSize < 2048 ? new ArrayList(this.batchSize) : new ArrayList());
        }
        this.flushCallback(value);
    }

    @Override
    protected void doError(Throwable ev) {
        this.buckets.clear();
        super.doError(ev);
    }

    @Override
    protected void doComplete() {
        for (List<T> bucket : this.buckets) {
            this.broadcastNext(bucket);
        }
        this.buckets.clear();
        super.doComplete();
    }

    private void flushCallback(T event) {
        Iterator<List<T>> it = this.buckets.iterator();
        while (it.hasNext()) {
            List<T> bucket = it.next();
            bucket.add(event);
            if (bucket.size() != this.batchSize) continue;
            it.remove();
            this.broadcastNext(bucket);
        }
    }

    @Override
    public String toString() {
        return super.toString() + "{skip=" + this.skip + "}";
    }
}

