/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.WatermarkPolicy;
import com.hazelcast.jet.function.BiFunctionEx;
import com.hazelcast.jet.function.BiPredicateEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.PredicateEx;
import com.hazelcast.jet.function.ToLongFunctionEx;
import com.hazelcast.jet.function.TriFunction;
import com.hazelcast.jet.impl.JetEvent;
import com.hazelcast.jet.impl.pipeline.AbstractStage;
import com.hazelcast.jet.impl.pipeline.FunctionAdapter;
import com.hazelcast.jet.impl.pipeline.JetEventFunctionAdapter;
import com.hazelcast.jet.impl.pipeline.PipelineImpl;
import com.hazelcast.jet.impl.pipeline.SinkImpl;
import com.hazelcast.jet.impl.pipeline.SinkStageImpl;
import com.hazelcast.jet.impl.pipeline.StreamStageImpl;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.FilterTransform;
import com.hazelcast.jet.impl.pipeline.transform.FlatMapTransform;
import com.hazelcast.jet.impl.pipeline.transform.GlobalRollingAggregateTransform;
import com.hazelcast.jet.impl.pipeline.transform.HashJoinTransform;
import com.hazelcast.jet.impl.pipeline.transform.MapTransform;
import com.hazelcast.jet.impl.pipeline.transform.MergeTransform;
import com.hazelcast.jet.impl.pipeline.transform.PartitionedProcessorTransform;
import com.hazelcast.jet.impl.pipeline.transform.PeekTransform;
import com.hazelcast.jet.impl.pipeline.transform.ProcessorTransform;
import com.hazelcast.jet.impl.pipeline.transform.RollingAggregateTransform;
import com.hazelcast.jet.impl.pipeline.transform.SinkTransform;
import com.hazelcast.jet.impl.pipeline.transform.TimestampTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.ContextFactory;
import com.hazelcast.jet.pipeline.GeneralStage;
import com.hazelcast.jet.pipeline.JoinClause;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.SinkStage;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.util.Preconditions;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;

public abstract class ComputeStageImplBase<T>
extends AbstractStage {
    static final FunctionAdapter DO_NOT_ADAPT = new FunctionAdapter();
    static final JetEventFunctionAdapter ADAPT_TO_JET_EVENT = new JetEventFunctionAdapter();
    @Nonnull
    public FunctionAdapter fnAdapter;

    ComputeStageImplBase(@Nonnull Transform transform, @Nonnull FunctionAdapter fnAdapter, @Nonnull PipelineImpl pipelineImpl, boolean acceptsDownstream) {
        super(transform, acceptsDownstream, pipelineImpl);
        this.fnAdapter = fnAdapter;
    }

    @Nonnull
    public StreamStage<T> addTimestamps(@Nonnull ToLongFunctionEx<? super T> timestampFn, long allowedLateness) {
        Preconditions.checkTrue(this.fnAdapter.equals(DO_NOT_ADAPT), "This stage already has timestamps assigned to it");
        com.hazelcast.jet.impl.util.Util.checkSerializable(timestampFn, "timestampFn");
        TimestampTransform<Object> tsTransform = new TimestampTransform<Object>(this.transform, EventTimePolicy.eventTimePolicy(timestampFn, (item, ts) -> JetEvent.jetEvent(ts, item), WatermarkPolicy.limitingLag(allowedLateness), 0L, 0L, 60000L));
        this.pipelineImpl.connect(this.transform, tsTransform);
        return new StreamStageImpl(tsTransform, ADAPT_TO_JET_EVENT, this.pipelineImpl);
    }

    @Nonnull
    <R, RET> RET attachMap(@Nonnull FunctionEx<? super T, ? extends R> mapFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(mapFn, "mapFn");
        return this.attach(new MapTransform(this.transform, this.fnAdapter.adaptMapFn(mapFn)), this.fnAdapter);
    }

    @Nonnull
    <RET> RET attachFilter(@Nonnull PredicateEx<T> filterFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(filterFn, "filterFn");
        return this.attach(new FilterTransform(this.transform, this.fnAdapter.adaptFilterFn(filterFn)), this.fnAdapter);
    }

    @Nonnull
    <R, RET> RET attachFlatMap(@Nonnull FunctionEx<? super T, ? extends Traverser<? extends R>> flatMapFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(flatMapFn, "flatMapFn");
        return this.attach(new FlatMapTransform(this.transform, this.fnAdapter.adaptFlatMapFn(flatMapFn)), this.fnAdapter);
    }

    @Nonnull
    <C, R, RET> RET attachMapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull BiFunctionEx<? super C, ? super T, ? extends R> mapFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(mapFn, "mapFn");
        BiFunctionEx<? super C, ?, ?> adaptedMapFn = this.fnAdapter.adaptMapUsingContextFn(mapFn);
        return this.attach(ProcessorTransform.mapUsingContextTransform(this.transform, contextFactory, adaptedMapFn), this.fnAdapter);
    }

    @Nonnull
    <C, RET> RET attachFilterUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull BiPredicateEx<? super C, ? super T> filterFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(filterFn, "filterFn");
        BiPredicateEx<? super C, ?> adaptedFilterFn = this.fnAdapter.adaptFilterUsingContextFn(filterFn);
        return this.attach(ProcessorTransform.filterUsingContextTransform(this.transform, contextFactory, adaptedFilterFn), this.fnAdapter);
    }

    @Nonnull
    <C, R, RET> RET attachFlatMapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull BiFunctionEx<? super C, ? super T, ? extends Traverser<? extends R>> flatMapFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(flatMapFn, "flatMapFn");
        BiFunctionEx<? super C, ?, Traverser<?>> adaptedFlatMapFn = this.fnAdapter.adaptFlatMapUsingContextFn(flatMapFn);
        return this.attach(ProcessorTransform.flatMapUsingContextTransform(this.transform, contextFactory, adaptedFlatMapFn), this.fnAdapter);
    }

    @Nonnull
    <C, R, RET> RET attachFlatMapUsingContextAsync(@Nonnull String operationName, @Nonnull ContextFactory<C> contextFactory, @Nonnull BiFunctionEx<? super C, ? super T, ? extends CompletableFuture<Traverser<R>>> flatMapAsyncFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(flatMapAsyncFn, operationName + "AsyncFn");
        BiFunctionEx<? super C, ?, CompletableFuture<Traverser<?>>> adaptedFlatMapFn = this.fnAdapter.adaptFlatMapUsingContextAsyncFn(flatMapAsyncFn);
        return this.attach(ProcessorTransform.flatMapUsingContextAsyncTransform(this.transform, operationName, contextFactory, adaptedFlatMapFn), this.fnAdapter);
    }

    @Nonnull
    <C, K, R, RET> RET attachMapUsingPartitionedContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull FunctionEx<? super T, ? extends K> partitionKeyFn, @Nonnull BiFunctionEx<? super C, ? super T, ? extends R> mapFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(mapFn, "mapFn");
        com.hazelcast.jet.impl.util.Util.checkSerializable(partitionKeyFn, "partitionKeyFn");
        BiFunctionEx<? super C, ?, ?> adaptedMapFn = this.fnAdapter.adaptMapUsingContextFn(mapFn);
        FunctionEx<?, ? extends K> adaptedPartitionKeyFn = this.fnAdapter.adaptKeyFn(partitionKeyFn);
        return this.attach(PartitionedProcessorTransform.mapUsingContextPartitionedTransform(this.transform, contextFactory, adaptedMapFn, adaptedPartitionKeyFn), this.fnAdapter);
    }

    @Nonnull
    <C, K, RET> RET attachFilterUsingPartitionedContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull FunctionEx<? super T, ? extends K> partitionKeyFn, @Nonnull BiPredicateEx<? super C, ? super T> filterFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(filterFn, "filterFn");
        com.hazelcast.jet.impl.util.Util.checkSerializable(partitionKeyFn, "partitionKeyFn");
        BiPredicateEx<? super C, ?> adaptedFilterFn = this.fnAdapter.adaptFilterUsingContextFn(filterFn);
        FunctionEx<?, ? extends K> adaptedPartitionKeyFn = this.fnAdapter.adaptKeyFn(partitionKeyFn);
        return this.attach(PartitionedProcessorTransform.filterUsingPartitionedContextTransform(this.transform, contextFactory, adaptedFilterFn, adaptedPartitionKeyFn), this.fnAdapter);
    }

    @Nonnull
    <C, K, R, RET> RET attachFlatMapUsingPartitionedContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull FunctionEx<? super T, ? extends K> partitionKeyFn, @Nonnull BiFunctionEx<? super C, ? super T, ? extends Traverser<? extends R>> flatMapFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(flatMapFn, "flatMapFn");
        com.hazelcast.jet.impl.util.Util.checkSerializable(partitionKeyFn, "partitionKeyFn");
        BiFunctionEx<? super C, ?, Traverser<?>> adaptedFlatMapFn = this.fnAdapter.adaptFlatMapUsingContextFn(flatMapFn);
        FunctionEx<?, ? extends K> adaptedPartitionKeyFn = this.fnAdapter.adaptKeyFn(partitionKeyFn);
        return this.attach(PartitionedProcessorTransform.flatMapUsingPartitionedContextTransform(this.transform, contextFactory, adaptedFlatMapFn, adaptedPartitionKeyFn), this.fnAdapter);
    }

    @Nonnull
    <C, K, R, RET> RET attachTransformUsingPartitionedContextAsync(@Nonnull String operationName, @Nonnull ContextFactory<C> contextFactory, @Nonnull FunctionEx<? super T, ? extends K> partitionKeyFn, @Nonnull BiFunctionEx<? super C, ? super T, CompletableFuture<Traverser<R>>> flatMapAsyncFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(flatMapAsyncFn, operationName + "AsyncFn");
        com.hazelcast.jet.impl.util.Util.checkSerializable(partitionKeyFn, "partitionKeyFn");
        BiFunctionEx<? super C, ?, CompletableFuture<Traverser<?>>> adaptedFlatMapFn = this.fnAdapter.adaptFlatMapUsingContextAsyncFn(flatMapAsyncFn);
        FunctionEx<?, ? extends K> adaptedPartitionKeyFn = this.fnAdapter.adaptKeyFn(partitionKeyFn);
        return this.attach(PartitionedProcessorTransform.flatMapUsingPartitionedContextAsyncTransform(this.transform, operationName, contextFactory, adaptedFlatMapFn, adaptedPartitionKeyFn), this.fnAdapter);
    }

    @Nonnull
    <K, R, OUT, RET> RET attachRollingAggregate(FunctionEx<? super T, ? extends K> keyFn, @Nonnull AggregateOperation1<? super T, ?, ? extends R> aggrOp) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(keyFn, "keyFn");
        return this.attach(new RollingAggregateTransform(this.transform, this.fnAdapter.adaptKeyFn(keyFn), this.fnAdapter.adaptAggregateOperation1(aggrOp), this.fnAdapter.adaptRollingAggregateOutputFn(Util::entry)), this.fnAdapter);
    }

    @Nonnull
    <R, RET> RET attachGlobalRollingAggregate(@Nonnull AggregateOperation1<? super T, ?, ? extends R> aggrOp) {
        GlobalRollingAggregateTransform transform = new GlobalRollingAggregateTransform(this.transform, this.fnAdapter.adaptAggregateOperation1(aggrOp), this.fnAdapter.adaptRollingAggregateOutputFn((key, result) -> result));
        return this.attach(transform, this.fnAdapter);
    }

    @Nonnull
    <RET> RET attachMerge(@Nonnull GeneralStage<? extends T> other) {
        return this.attach(new MergeTransform(this.transform, ((AbstractStage)((Object)other)).transform), this.fnAdapter);
    }

    @Nonnull
    <K1, T1_IN, T1, R, RET> RET attachHashJoin(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1, ? super T, ? super T1_IN, ? extends T1> joinClause, @Nonnull BiFunctionEx<T, T1, R> mapToOutputFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(mapToOutputFn, "mapToOutputFn");
        return this.attach(new HashJoinTransform(Arrays.asList(this.transform, ComputeStageImplBase.transformOf(stage1)), Collections.singletonList(this.fnAdapter.adaptJoinClause(joinClause)), Collections.emptyList(), this.fnAdapter.adaptHashJoinOutputFn(mapToOutputFn)), this.fnAdapter);
    }

    @Nonnull
    <K1, T1_IN, T1, K2, T2_IN, T2, R, TA, RET> RET attachHashJoin2(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1, ? super T, ? super T1_IN, ? extends T1> joinClause1, @Nonnull BatchStage<T2_IN> stage2, @Nonnull JoinClause<K2, ? super T, ? super T2_IN, ? extends T2> joinClause2, @Nonnull TriFunction<T, T1, T2, R> mapToOutputFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(mapToOutputFn, "mapToOutputFn");
        return this.attach(new HashJoinTransform(Arrays.asList(this.transform, ComputeStageImplBase.transformOf(stage1), ComputeStageImplBase.transformOf(stage2)), Arrays.asList(this.fnAdapter.adaptJoinClause(joinClause1), this.fnAdapter.adaptJoinClause(joinClause2)), Collections.emptyList(), this.fnAdapter.adaptHashJoinOutputFn(mapToOutputFn)), this.fnAdapter);
    }

    @Nonnull
    <RET> RET attachPeek(@Nonnull PredicateEx<? super T> shouldLogFn, @Nonnull FunctionEx<? super T, ? extends CharSequence> toStringFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(shouldLogFn, "shouldLogFn");
        com.hazelcast.jet.impl.util.Util.checkSerializable(toStringFn, "toStringFn");
        return this.attach(new PeekTransform(this.transform, this.fnAdapter.adaptFilterFn(shouldLogFn), this.fnAdapter.adaptToStringFn(toStringFn)), this.fnAdapter);
    }

    @Nonnull
    <RET> RET attachCustomTransform(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier) {
        return this.attach(ProcessorTransform.customProcessorTransform(stageName, this.transform, procSupplier), this.fnAdapter);
    }

    @Nonnull
    <K, RET> RET attachPartitionedCustomTransform(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier, @Nonnull FunctionEx<? super T, ? extends K> partitionKeyFn) {
        FunctionEx<?, ? extends K> adaptedKeyFn = this.fnAdapter.adaptKeyFn(partitionKeyFn);
        return this.attach(PartitionedProcessorTransform.partitionedCustomProcessorTransform(stageName, this.transform, procSupplier, adaptedKeyFn), this.fnAdapter);
    }

    @Nonnull
    public SinkStage drainTo(@Nonnull Sink<? super T> sink) {
        SinkImpl sinkImpl = (SinkImpl)sink;
        SinkTransform sinkTransform = new SinkTransform(sinkImpl, this.transform, this.fnAdapter == ADAPT_TO_JET_EVENT);
        SinkStageImpl output = new SinkStageImpl(sinkTransform, this.pipelineImpl);
        sinkImpl.onAssignToStage();
        this.pipelineImpl.connect(this.transform, sinkTransform);
        return output;
    }

    @Nonnull
    abstract <RET> RET attach(@Nonnull AbstractTransform var1, @Nonnull FunctionAdapter var2);

    static void ensureJetEvents(@Nonnull ComputeStageImplBase stage, @Nonnull String name) {
        if (stage.fnAdapter != ADAPT_TO_JET_EVENT) {
            throw new IllegalStateException(name + " is missing a timestamp definition. Call one of the .addTimestamps() methods on it before performing the aggregation.");
        }
    }
}

