/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.func.Action;
import ratpack.stream.internal.BufferedWriteStream;
import ratpack.stream.internal.BufferingPublisher;

public class BatchingPublisher<T>
extends BufferingPublisher<T> {
    public BatchingPublisher(final Publisher<T> upstream, final int batchSize, Action<? super T> disposer) {
        super(disposer, (? super BufferedWriteStream<T> write) -> new Subscription((BufferedWriteStream)write){
            private Subscription subscription;
            private int batchCounter;
            private State state;
            final /* synthetic */ BufferedWriteStream val$write;
            {
                this.val$write = bufferedWriteStream;
                this.batchCounter = batchSize;
                this.state = State.Idle;
            }

            public void request(long n) {
                if (this.state == State.Closed) {
                    return;
                }
                if (this.subscription == null) {
                    upstream.subscribe(new Subscriber<T>(){

                        public void onSubscribe(Subscription s) {
                            subscription = s;
                            s.request((long)batchSize);
                        }

                        public void onNext(T t) {
                            if (state != State.Closed) {
                                state = State.Writing;
                                val$write.item(t);
                                if (--batchCounter == 0 && val$write.getRequested() > 0L) {
                                    this.maybeFetch();
                                }
                            }
                        }

                        public void onError(Throwable t) {
                            state = State.Closed;
                            val$write.error(t);
                        }

                        public void onComplete() {
                            state = State.Closed;
                            val$write.complete();
                        }
                    });
                } else if (this.batchCounter == 0) {
                    this.maybeFetch();
                }
            }

            private void maybeFetch() {
                if (this.state != State.Fetching) {
                    this.state = State.Fetching;
                    this.batchCounter = batchSize;
                    this.subscription.request((long)batchSize);
                }
            }

            public void cancel() {
                this.state = State.Closed;
                if (this.subscription != null) {
                    this.subscription.cancel();
                }
            }
        });
    }

    private static enum State {
        Fetching,
        Writing,
        Closed,
        Idle;

    }
}

