/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline;

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.BiPredicateEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.PredicateEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.function.TriFunction;
import com.hazelcast.jet.impl.pipeline.ComputeStageImplBase;
import com.hazelcast.jet.impl.pipeline.FunctionAdapter;
import com.hazelcast.jet.impl.pipeline.PipelineImpl;
import com.hazelcast.jet.impl.pipeline.StageWithWindowImpl;
import com.hazelcast.jet.impl.pipeline.StreamStageWithKeyImpl;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.JoinClause;
import com.hazelcast.jet.pipeline.ServiceFactory;
import com.hazelcast.jet.pipeline.StageWithWindow;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.StreamStageWithKey;
import com.hazelcast.jet.pipeline.WindowDefinition;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;

public class StreamStageImpl<T>
extends ComputeStageImplBase<T>
implements StreamStage<T> {
    public StreamStageImpl(@Nonnull Transform transform, @Nonnull FunctionAdapter fnAdapter, @Nonnull PipelineImpl pipeline) {
        super(transform, fnAdapter, pipeline);
    }

    StreamStageImpl(StreamStageImpl<T> toCopy, boolean rebalanceOutput) {
        super(toCopy, rebalanceOutput);
    }

    <K> StreamStageImpl(StreamStageImpl<T> toCopy, FunctionEx<? super T, ? extends K> keyFn) {
        super(toCopy, keyFn);
    }

    @Override
    @Nonnull
    public <K> StreamStageWithKey<T, K> groupingKey(@Nonnull FunctionEx<? super T, ? extends K> keyFn) {
        Util.checkSerializable(keyFn, "keyFn");
        return new StreamStageWithKeyImpl<T, K>(this, keyFn);
    }

    @Override
    @Nonnull
    public <K> StreamStage<T> rebalance(@Nonnull FunctionEx<? super T, ? extends K> keyFn) {
        Util.checkSerializable(keyFn, "keyFn");
        FunctionEx<?, ? extends K> adaptedKeyFn = this.fnAdapter.adaptKeyFn(keyFn);
        return new StreamStageImpl(this, adaptedKeyFn);
    }

    @Override
    @Nonnull
    public StreamStage<T> rebalance() {
        return new StreamStageImpl<T>(this, true);
    }

    @Override
    @Nonnull
    public StageWithWindow<T> window(WindowDefinition wDef) {
        return new StageWithWindowImpl(this, wDef);
    }

    @Override
    @Nonnull
    public <R> StreamStage<R> map(@Nonnull FunctionEx<? super T, ? extends R> mapFn) {
        return (StreamStage)this.attachMap(mapFn);
    }

    @Override
    @Nonnull
    public StreamStage<T> filter(@Nonnull PredicateEx<T> filterFn) {
        return (StreamStage)this.attachFilter(filterFn);
    }

    @Override
    @Nonnull
    public <R> StreamStage<R> flatMap(@Nonnull FunctionEx<? super T, ? extends Traverser<R>> flatMapFn) {
        return (StreamStage)this.attachFlatMap(flatMapFn);
    }

    @Override
    @Nonnull
    public <S, R> StreamStage<R> mapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiFunctionEx<? super S, ? super T, ? extends R> mapFn) {
        return (StreamStage)this.attachGlobalMapStateful(createFn, mapFn);
    }

    @Override
    @Nonnull
    public <S> StreamStage<T> filterStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiPredicateEx<? super S, ? super T> filterFn) {
        return (StreamStage)this.attachGlobalMapStateful(createFn, (BiFunctionEx & Serializable)(s, t) -> filterFn.test(s, t) ? t : null);
    }

    @Override
    @Nonnull
    public <S, R> StreamStage<R> flatMapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiFunctionEx<? super S, ? super T, ? extends Traverser<R>> flatMapFn) {
        return (StreamStage)this.attachGlobalFlatMapStateful(createFn, flatMapFn);
    }

    @Override
    @Nonnull
    public <S, R> StreamStage<R> mapUsingService(@Nonnull ServiceFactory<?, S> serviceFactory, @Nonnull BiFunctionEx<? super S, ? super T, ? extends R> mapFn) {
        return (StreamStage)this.attachMapUsingService(serviceFactory, mapFn);
    }

    @Override
    @Nonnull
    public <S, R> StreamStage<R> mapUsingServiceAsync(@Nonnull ServiceFactory<?, S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, @Nonnull BiFunctionEx<? super S, ? super T, ? extends CompletableFuture<R>> mapAsyncFn) {
        return (StreamStage)this.attachMapUsingServiceAsync(serviceFactory, maxConcurrentOps, preserveOrder, (BiFunctionEx & Serializable)(s, t) -> ((CompletableFuture)mapAsyncFn.apply(s, t)).thenApply(Traversers::singleton));
    }

    @Override
    @Nonnull
    public <S, R> StreamStage<R> mapUsingServiceAsyncBatched(@Nonnull ServiceFactory<?, S> serviceFactory, int maxBatchSize, @Nonnull BiFunctionEx<? super S, ? super List<T>, ? extends CompletableFuture<List<R>>> mapAsyncBatchedFn) {
        return (StreamStage)this.attachMapUsingServiceAsyncBatched(serviceFactory, maxBatchSize, (BiFunctionEx & Serializable)(s, t) -> ((CompletableFuture)mapAsyncBatchedFn.apply(s, t)).thenApply(list -> Util.toList(list, Traversers::singleton)));
    }

    @Override
    @Nonnull
    public <S> StreamStage<T> filterUsingService(@Nonnull ServiceFactory<?, S> serviceFactory, @Nonnull BiPredicateEx<? super S, ? super T> filterFn) {
        return (StreamStage)this.attachFilterUsingService(serviceFactory, filterFn);
    }

    @Override
    @Nonnull
    public <S, R> StreamStage<R> flatMapUsingService(@Nonnull ServiceFactory<?, S> serviceFactory, @Nonnull BiFunctionEx<? super S, ? super T, ? extends Traverser<R>> flatMapFn) {
        return (StreamStage)this.attachFlatMapUsingService(serviceFactory, flatMapFn);
    }

    @Override
    @Nonnull
    public StreamStage<T> merge(@Nonnull StreamStage<? extends T> other) {
        return (StreamStage)this.attachMerge(other);
    }

    @Override
    @Nonnull
    public <K, T1_IN, T1, R> StreamStage<R> hashJoin(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K, ? super T, ? super T1_IN, ? extends T1> joinClause1, @Nonnull BiFunctionEx<T, T1, R> mapToOutputFn) {
        return (StreamStage)this.attachHashJoin(stage1, joinClause1, mapToOutputFn);
    }

    @Override
    @Nonnull
    public <K, T1_IN, T1, R> StreamStage<R> innerHashJoin(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K, ? super T, ? super T1_IN, ? extends T1> joinClause1, @Nonnull BiFunctionEx<T, T1, R> mapToOutputFn) {
        BiFunctionEx & Serializable finalOutputFn = (BiFunctionEx & Serializable)(leftSide, rightSide) -> {
            if (leftSide == null || rightSide == null) {
                return null;
            }
            return mapToOutputFn.apply(leftSide, rightSide);
        };
        return (StreamStage)this.attachHashJoin(stage1, joinClause1, finalOutputFn);
    }

    @Override
    @Nonnull
    public <K1, K2, T1_IN, T2_IN, T1, T2, R> StreamStage<R> hashJoin2(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1, ? super T, ? super T1_IN, ? extends T1> joinClause1, @Nonnull BatchStage<T2_IN> stage2, @Nonnull JoinClause<K2, ? super T, ? super T2_IN, ? extends T2> joinClause2, @Nonnull TriFunction<T, T1, T2, R> mapToOutputFn) {
        return (StreamStage)this.attachHashJoin2(stage1, joinClause1, stage2, joinClause2, mapToOutputFn);
    }

    @Override
    @Nonnull
    public <K1, K2, T1_IN, T2_IN, T1, T2, R> StreamStage<R> innerHashJoin2(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1, ? super T, ? super T1_IN, ? extends T1> joinClause1, @Nonnull BatchStage<T2_IN> stage2, @Nonnull JoinClause<K2, ? super T, ? super T2_IN, ? extends T2> joinClause2, @Nonnull TriFunction<T, T1, T2, R> mapToOutputFn) {
        TriFunction<Object, Object, Object, Object> finalOutputFn = (leftSide, middle, rightSide) -> {
            if (leftSide == null || middle == null || rightSide == null) {
                return null;
            }
            return mapToOutputFn.apply(leftSide, middle, rightSide);
        };
        return (StreamStage)this.attachHashJoin2(stage1, joinClause1, stage2, joinClause2, finalOutputFn);
    }

    @Override
    @Nonnull
    public StreamStage<T> peek(@Nonnull PredicateEx<? super T> shouldLogFn, @Nonnull FunctionEx<? super T, ? extends CharSequence> toStringFn) {
        return (StreamStage)this.attachPeek(shouldLogFn, toStringFn);
    }

    @Override
    @Nonnull
    public <R> StreamStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier) {
        return (StreamStage)this.attachCustomTransform(stageName, procSupplier);
    }

    @Override
    <RET> RET newStage(@Nonnull AbstractTransform transform, @Nonnull FunctionAdapter fnAdapter) {
        return (RET)new StreamStageImpl<T>(transform, fnAdapter, this.pipelineImpl);
    }

    @Override
    @Nonnull
    public StreamStage<T> setLocalParallelism(int localParallelism) {
        super.setLocalParallelism(localParallelism);
        return this;
    }

    @Override
    @Nonnull
    public StreamStage<T> setName(@Nonnull String name) {
        super.setName(name);
        return this;
    }
}

