/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.aggregation;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.fn.Supplier;
import reactor.rx.action.Action;

public class BufferShiftWhenAction<T>
extends Action<T, List<T>> {
    private final List<List<T>> buckets = new LinkedList<List<T>>();
    private final Supplier<? extends Publisher<?>> bucketClosing;
    private final Publisher<?> bucketOpening;

    public BufferShiftWhenAction(Publisher<?> bucketOpenings, Supplier<? extends Publisher<?>> boundarySupplier) {
        this.bucketClosing = boundarySupplier;
        this.bucketOpening = bucketOpenings;
    }

    @Override
    protected void doOnSubscribe(Subscription subscription) {
        super.doOnSubscribe(subscription);
        this.bucketOpening.subscribe(new Subscriber<Object>(){
            Subscription s;

            @Override
            public void onSubscribe(Subscription s) {
                this.s = s;
                s.request(1L);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onNext(Object o) {
                ArrayList newBucket = new ArrayList();
                List list = BufferShiftWhenAction.this.buckets;
                synchronized (list) {
                    BufferShiftWhenAction.this.buckets.add(newBucket);
                }
                ((Publisher)BufferShiftWhenAction.this.bucketClosing.get()).subscribe(new BucketConsumer(newBucket));
                if (this.s != null) {
                    this.s.request(1L);
                }
            }

            @Override
            public void onError(Throwable t) {
                if (this.s != null) {
                    this.s.cancel();
                }
                BufferShiftWhenAction.this.onError(t);
            }

            @Override
            public void onComplete() {
                if (this.s != null) {
                    this.s.cancel();
                }
                BufferShiftWhenAction.this.onComplete();
            }
        });
    }

    @Override
    protected void doError(Throwable ev) {
        this.buckets.clear();
        super.doError(ev);
    }

    @Override
    protected void doComplete() {
        if (!this.buckets.isEmpty()) {
            for (List<T> bucket : this.buckets) {
                this.broadcastNext(bucket);
            }
        }
        this.buckets.clear();
        super.doComplete();
    }

    @Override
    protected void doNext(T value) {
        if (!this.buckets.isEmpty()) {
            for (List<T> bucket : this.buckets) {
                bucket.add(value);
            }
        }
    }

    private class BucketConsumer
    implements Subscriber<Object> {
        final List<T> bucket;
        Subscription s;

        public BucketConsumer(List<T> bucket) {
            this.bucket = bucket;
        }

        @Override
        public void onSubscribe(Subscription s) {
            this.s = s;
            s.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(Object o) {
            this.onComplete();
        }

        @Override
        public void onError(Throwable t) {
            if (this.s != null) {
                this.s.cancel();
            }
            BufferShiftWhenAction.this.onError(t);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onComplete() {
            boolean emit;
            if (this.s != null) {
                this.s.cancel();
            }
            List list = BufferShiftWhenAction.this.buckets;
            synchronized (list) {
                emit = BufferShiftWhenAction.this.buckets.remove(this.bucket);
            }
            if (emit) {
                BufferShiftWhenAction.this.broadcastNext(this.bucket);
            }
        }
    }
}

