/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.stream;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscriber;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.Exceptions;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.rx.Stream;
import reactor.rx.subscription.PushSubscription;

public class BarrierStream
extends Stream<List<Object>> {
    private final AtomicInteger wrappedCnt = new AtomicInteger(0);
    private final AtomicInteger resultCnt = new AtomicInteger(0);
    private final List<Object> values = new ArrayList<Object>();
    private PushSubscription<List<Object>> downstream;

    public BarrierStream() {
    }

    public BarrierStream(Environment env) {
        this.dispatchOn(env);
    }

    public BarrierStream(Dispatcher dispatcher) {
        this.dispatchOn(dispatcher);
    }

    public BarrierStream(Environment env, Dispatcher dispatcher) {
        this.dispatchOn(env, dispatcher);
    }

    public <I, O> Function<I, O> wrap(final Function<I, O> fn) {
        if (null != this.downstream && this.downstream.isComplete()) {
            throw new IllegalStateException("This BarrierStream is already complete");
        }
        final int valIdx = this.wrappedCnt.getAndIncrement();
        return new Function<I, O>(){

            @Override
            public O apply(I in) {
                Object out = fn.apply(in);
                BarrierStream.this.addResult(valIdx, out);
                return out;
            }
        };
    }

    public <I> Consumer<I> wrap(final Consumer<I> consumer) {
        if (null != this.downstream && this.downstream.isComplete()) {
            throw new IllegalStateException("This BarrierStream is already complete");
        }
        final int valIdx = this.wrappedCnt.getAndIncrement();
        return new Consumer<I>(){

            @Override
            public void accept(I in) {
                consumer.accept(in);
                BarrierStream.this.addResult(valIdx, in);
            }
        };
    }

    @Override
    public void subscribe(Subscriber<? super List<Object>> s) {
        if (null != this.downstream) {
            throw new IllegalStateException("This BarrierStream already has a Subscriber");
        }
        this.downstream = new PushSubscription<List<Object>>((Stream)this, s){

            @Override
            public void request(long n) {
                super.request(n);
                if (BarrierStream.this.resultCnt.get() == BarrierStream.this.wrappedCnt.get()) {
                    try {
                        this.onNext(BarrierStream.this.values);
                        this.onComplete();
                    }
                    catch (Throwable t) {
                        this.onError(t);
                    }
                }
            }
        };
        try {
            s.onSubscribe(this.downstream);
        }
        catch (Throwable throwable) {
            Exceptions.throwIfFatal(throwable);
            s.onError(throwable);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addResult(int idx, Object obj) {
        List<Object> list = this.values;
        synchronized (list) {
            if (this.values.size() == idx) {
                this.values.add(obj);
            } else if (this.values.size() < idx) {
                for (int i = this.values.size(); i < idx; ++i) {
                    this.values.add(null);
                }
                this.values.add(obj);
            } else {
                this.values.set(idx, obj);
            }
        }
        if (this.resultCnt.incrementAndGet() == this.wrappedCnt.get() && null != this.downstream && !this.downstream.isComplete()) {
            this.downstream.onNext(this.values);
            this.downstream.onComplete();
        }
    }
}

