/*
 * Decompiled with CFR 0.152.
 */
package com.aol.cyclops.internal.react;

import com.aol.cyclops.Monoid;
import com.aol.cyclops.Reducer;
import com.aol.cyclops.control.LazyReact;
import com.aol.cyclops.control.ReactiveSeq;
import com.aol.cyclops.data.async.QueueFactories;
import com.aol.cyclops.data.async.QueueFactory;
import com.aol.cyclops.data.collections.extensions.standard.ListX;
import com.aol.cyclops.internal.react.DelegateStream;
import com.aol.cyclops.internal.react.stream.LazyStreamWrapper;
import com.aol.cyclops.react.async.subscription.Continueable;
import com.aol.cyclops.react.async.subscription.Subscription;
import com.aol.cyclops.react.collectors.lazy.BatchingCollector;
import com.aol.cyclops.react.collectors.lazy.LazyResultConsumer;
import com.aol.cyclops.react.collectors.lazy.MaxActive;
import com.aol.cyclops.react.threads.ReactPool;
import com.aol.cyclops.types.futurestream.LazyFutureStream;
import com.aol.cyclops.types.stream.HeadAndTail;
import com.aol.cyclops.types.stream.HotStream;
import com.aol.cyclops.types.stream.PausableHotStream;
import com.aol.cyclops.util.stream.StreamUtils;
import com.nurkiewicz.asyncretry.RetryExecutor;
import java.beans.ConstructorProperties;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.lambda.Collectable;
import org.jooq.lambda.Seq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LazyFutureStreamImpl<U>
implements LazyFutureStream<U> {
    private static final Logger log = LoggerFactory.getLogger(LazyFutureStreamImpl.class);
    private final Optional<Consumer<Throwable>> errorHandler;
    private final LazyStreamWrapper<U> lastActive;
    private final Supplier<LazyResultConsumer<U>> lazyCollector;
    private final QueueFactory<U> queueFactory;
    private final LazyReact simpleReact;
    private final Continueable subscription;
    private static final ReactPool<LazyReact> pool = ReactPool.elasticPool(() -> new LazyReact(Executors.newSingleThreadExecutor()));
    private final ConsumerHolder error;
    private final MaxActive maxActive;

    public LazyFutureStreamImpl(LazyReact lazyReact, Stream<U> stream) {
        this.simpleReact = lazyReact;
        this.lastActive = new LazyStreamWrapper(stream, lazyReact);
        this.error = new ConsumerHolder(a -> {});
        this.errorHandler = Optional.of(e -> {
            this.error.forward.accept((Throwable)e);
            log.error(e.getMessage(), e);
        });
        this.lazyCollector = () -> new BatchingCollector(this.getMaxActive(), this);
        this.queueFactory = QueueFactories.unboundedNonBlockingQueue();
        this.subscription = new Subscription();
        this.maxActive = lazyReact.getMaxActive();
    }

    @Override
    public void forwardErrors(Consumer<Throwable> c) {
        this.error.forward = c;
    }

    @Override
    public LazyReact getPopulator() {
        return pool.nextReactor();
    }

    @Override
    public void returnPopulator(LazyReact service) {
        pool.populate(service);
    }

    @Override
    public <R, A> R collect(Collector<? super U, A, R> collector) {
        return this.block(collector);
    }

    @Override
    public void close() {
    }

    @Override
    public LazyFutureStream<U> withAsync(boolean b) {
        return this.withSimpleReact(this.simpleReact.withAsync(b));
    }

    @Override
    public Executor getTaskExecutor() {
        return this.simpleReact.getExecutor();
    }

    @Override
    public RetryExecutor getRetrier() {
        return this.simpleReact.getRetrier();
    }

    @Override
    public boolean isAsync() {
        return this.simpleReact.isAsync();
    }

    @Override
    public LazyFutureStream<U> withTaskExecutor(Executor e) {
        return this.withSimpleReact(this.simpleReact.withExecutor(e));
    }

    @Override
    public LazyFutureStream<U> withRetrier(RetryExecutor retry) {
        return this.withSimpleReact(this.simpleReact.withRetrier(retry));
    }

    @Override
    public LazyFutureStream<U> withLastActive(LazyStreamWrapper w) {
        return new LazyFutureStreamImpl<U>(this.errorHandler, w, this.lazyCollector, this.queueFactory, this.simpleReact, this.subscription, this.error, this.maxActive);
    }

    @Override
    public LazyFutureStream<U> maxActive(int max) {
        return this.withMaxActive(new MaxActive(max, max));
    }

    @Override
    public void cancel() {
        this.subscription.closeAll();
    }

    @Override
    public HotStream<U> schedule(String cron, ScheduledExecutorService ex) {
        return ReactiveSeq.fromStream(this.toStream()).schedule(cron, ex);
    }

    @Override
    public HotStream<U> scheduleFixedDelay(long delay, ScheduledExecutorService ex) {
        return ReactiveSeq.fromStream(this.toStream()).scheduleFixedDelay(delay, ex);
    }

    @Override
    public HotStream<U> scheduleFixedRate(long rate, ScheduledExecutorService ex) {
        return ReactiveSeq.fromStream(this.toStream()).scheduleFixedRate(rate, ex);
    }

    @Override
    public <T> LazyFutureStream<T> unitIterator(Iterator<T> it) {
        return this.simpleReact.from(it);
    }

    @Override
    public LazyFutureStream<U> append(U value) {
        return this.fromStream((Stream)this.stream().append(value));
    }

    @Override
    public LazyFutureStream<U> prepend(U value) {
        return this.fromStream((Stream)this.stream().prepend(value));
    }

    @Override
    public <T> LazyFutureStream<T> unit(T unit) {
        return this.fromStream((Stream)((Object)this.stream().unit(unit)));
    }

    @Override
    public HotStream<U> hotStream(Executor e) {
        return StreamUtils.hotStream(this, e);
    }

    @Override
    public HotStream<U> primedHotStream(Executor e) {
        return StreamUtils.primedHotStream(this, e);
    }

    @Override
    public PausableHotStream<U> pausableHotStream(Executor e) {
        return StreamUtils.pausableHotStream(this, e);
    }

    @Override
    public PausableHotStream<U> primedPausableHotStream(Executor e) {
        return StreamUtils.primedPausableHotStream(this, e);
    }

    public String format() {
        return Seq.seq((Seq)this).format();
    }

    @Override
    public Collectable<U> collectable() {
        return Seq.seq(new DelegateStream(this));
    }

    @Override
    public U foldRight(Monoid<U> reducer) {
        return reducer.reduce(this);
    }

    @Override
    public <T> T foldRightMapToType(Reducer<T> reducer) {
        return reducer.mapReduce(this.reverse());
    }

    @Override
    public <R> R mapReduce(Reducer<R> reducer) {
        return reducer.mapReduce(this);
    }

    @Override
    public <R> R mapReduce(Function<? super U, ? extends R> mapper, Monoid<R> reducer) {
        return Reducer.fromMonoid(reducer, mapper).mapReduce(this);
    }

    @Override
    public U reduce(Monoid<U> reducer) {
        return reducer.reduce(this);
    }

    @Override
    public ListX<U> reduce(Stream<? extends Monoid<U>> reducers) {
        return StreamUtils.reduce(this, reducers);
    }

    @Override
    public ListX<U> reduce(Iterable<? extends Monoid<U>> reducers) {
        return StreamUtils.reduce(this, reducers);
    }

    @Override
    public U foldRight(U identity, BinaryOperator<U> accumulator) {
        return (U)this.reverse().foldLeft(identity, accumulator);
    }

    @Override
    public Optional<U> min(Comparator<? super U> comparator) {
        return StreamUtils.min(this, comparator);
    }

    @Override
    public Optional<U> max(Comparator<? super U> comparator) {
        return StreamUtils.max(this, comparator);
    }

    @Override
    public long count() {
        return this.collect(Collectors.toList()).size();
    }

    @Override
    public boolean allMatch(Predicate<? super U> c) {
        return this.filterNot((Predicate)c).count() == 0L;
    }

    @Override
    public boolean anyMatch(Predicate<? super U> c) {
        return this.filter((Predicate)c).findAny().isPresent();
    }

    @Override
    public boolean xMatch(int num, Predicate<? super U> c) {
        return StreamUtils.xMatch(this, num, c);
    }

    @Override
    public boolean noneMatch(Predicate<? super U> c) {
        return !this.anyMatch(c);
    }

    @Override
    public final String join() {
        return StreamUtils.join(this);
    }

    @Override
    public final String join(String sep) {
        return StreamUtils.join(this, sep);
    }

    @Override
    public String join(String sep, String start, String end) {
        return StreamUtils.join(this, sep, start, end);
    }

    @Override
    public HeadAndTail<U> headAndTail() {
        return StreamUtils.headAndTail(this);
    }

    @Override
    public LazyFutureStreamImpl<U> withErrorHandler(Optional<Consumer<Throwable>> errorHandler) {
        return this.errorHandler == errorHandler ? this : new LazyFutureStreamImpl<U>(errorHandler, this.lastActive, this.lazyCollector, this.queueFactory, this.simpleReact, this.subscription, this.error, this.maxActive);
    }

    @Override
    public LazyFutureStreamImpl<U> withLazyCollector(Supplier<LazyResultConsumer<U>> lazyCollector) {
        return this.lazyCollector == lazyCollector ? this : new LazyFutureStreamImpl<U>(this.errorHandler, this.lastActive, lazyCollector, this.queueFactory, this.simpleReact, this.subscription, this.error, this.maxActive);
    }

    @Override
    public LazyFutureStreamImpl<U> withQueueFactory(QueueFactory<U> queueFactory) {
        return this.queueFactory == queueFactory ? this : new LazyFutureStreamImpl<U>(this.errorHandler, this.lastActive, this.lazyCollector, queueFactory, this.simpleReact, this.subscription, this.error, this.maxActive);
    }

    public LazyFutureStreamImpl<U> withSimpleReact(LazyReact simpleReact) {
        return this.simpleReact == simpleReact ? this : new LazyFutureStreamImpl<U>(this.errorHandler, this.lastActive, this.lazyCollector, this.queueFactory, simpleReact, this.subscription, this.error, this.maxActive);
    }

    @Override
    public LazyFutureStreamImpl<U> withSubscription(Continueable subscription) {
        return this.subscription == subscription ? this : new LazyFutureStreamImpl<U>(this.errorHandler, this.lastActive, this.lazyCollector, this.queueFactory, this.simpleReact, subscription, this.error, this.maxActive);
    }

    public LazyFutureStreamImpl<U> withError(ConsumerHolder error) {
        return this.error == error ? this : new LazyFutureStreamImpl<U>(this.errorHandler, this.lastActive, this.lazyCollector, this.queueFactory, this.simpleReact, this.subscription, error, this.maxActive);
    }

    public LazyFutureStreamImpl<U> withMaxActive(MaxActive maxActive) {
        return this.maxActive == maxActive ? this : new LazyFutureStreamImpl<U>(this.errorHandler, this.lastActive, this.lazyCollector, this.queueFactory, this.simpleReact, this.subscription, this.error, maxActive);
    }

    @Override
    public Optional<Consumer<Throwable>> getErrorHandler() {
        return this.errorHandler;
    }

    @Override
    public LazyStreamWrapper<U> getLastActive() {
        return this.lastActive;
    }

    @Override
    public Supplier<LazyResultConsumer<U>> getLazyCollector() {
        return this.lazyCollector;
    }

    @Override
    public QueueFactory<U> getQueueFactory() {
        return this.queueFactory;
    }

    @Override
    public LazyReact getSimpleReact() {
        return this.simpleReact;
    }

    @Override
    public Continueable getSubscription() {
        return this.subscription;
    }

    public ConsumerHolder getError() {
        return this.error;
    }

    @Override
    public MaxActive getMaxActive() {
        return this.maxActive;
    }

    private LazyFutureStreamImpl(Optional<Consumer<Throwable>> errorHandler, LazyStreamWrapper<U> lastActive, Supplier<LazyResultConsumer<U>> lazyCollector, QueueFactory<U> queueFactory, LazyReact simpleReact, Continueable subscription, ConsumerHolder error, MaxActive maxActive) {
        this.errorHandler = errorHandler;
        this.lastActive = lastActive;
        this.lazyCollector = lazyCollector;
        this.queueFactory = queueFactory;
        this.simpleReact = simpleReact;
        this.subscription = subscription;
        this.error = error;
        this.maxActive = maxActive;
    }

    static class ConsumerHolder {
        volatile Consumer<Throwable> forward;

        @ConstructorProperties(value={"forward"})
        public ConsumerHolder(Consumer<Throwable> forward) {
            this.forward = forward;
        }
    }
}

