/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.WatermarkEmissionPolicy;
import com.hazelcast.jet.core.WatermarkGenerationParams;
import com.hazelcast.jet.core.WatermarkPolicies;
import com.hazelcast.jet.core.WatermarkPolicy;
import com.hazelcast.jet.function.DistributedBiFunction;
import com.hazelcast.jet.function.DistributedBiPredicate;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedPredicate;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.function.DistributedToLongFunction;
import com.hazelcast.jet.function.DistributedTriFunction;
import com.hazelcast.jet.impl.pipeline.AbstractStage;
import com.hazelcast.jet.impl.pipeline.FunctionAdapter;
import com.hazelcast.jet.impl.pipeline.JetEvent;
import com.hazelcast.jet.impl.pipeline.JetEventFunctionAdapter;
import com.hazelcast.jet.impl.pipeline.PipelineImpl;
import com.hazelcast.jet.impl.pipeline.SinkImpl;
import com.hazelcast.jet.impl.pipeline.SinkStageImpl;
import com.hazelcast.jet.impl.pipeline.StreamStageImpl;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.FilterTransform;
import com.hazelcast.jet.impl.pipeline.transform.FilterUsingContextTransform;
import com.hazelcast.jet.impl.pipeline.transform.FlatMapTransform;
import com.hazelcast.jet.impl.pipeline.transform.FlatMapUsingContextTransform;
import com.hazelcast.jet.impl.pipeline.transform.HashJoinTransform;
import com.hazelcast.jet.impl.pipeline.transform.MapTransform;
import com.hazelcast.jet.impl.pipeline.transform.MapUsingContextTransform;
import com.hazelcast.jet.impl.pipeline.transform.PeekTransform;
import com.hazelcast.jet.impl.pipeline.transform.ProcessorTransform;
import com.hazelcast.jet.impl.pipeline.transform.SinkTransform;
import com.hazelcast.jet.impl.pipeline.transform.StreamSourceTransform;
import com.hazelcast.jet.impl.pipeline.transform.TimestampTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.ContextFactory;
import com.hazelcast.jet.pipeline.JoinClause;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.SinkStage;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.util.Preconditions;
import java.util.Arrays;
import java.util.Collections;
import javax.annotation.Nonnull;

public abstract class ComputeStageImplBase<T>
extends AbstractStage {
    static final FunctionAdapter DONT_ADAPT = new FunctionAdapter();
    static final JetEventFunctionAdapter ADAPT_TO_JET_EVENT = new JetEventFunctionAdapter();
    private static final WatermarkEmissionPolicy THROWING_EMIT_POLICY = (currentWm, lastEmittedWm) -> {
        throw new IllegalStateException("emit policy should have been replaced");
    };
    @Nonnull
    public FunctionAdapter fnAdapter;

    ComputeStageImplBase(@Nonnull Transform transform, @Nonnull FunctionAdapter fnAdapter, @Nonnull PipelineImpl pipelineImpl, boolean acceptsDownstream) {
        super(transform, acceptsDownstream, pipelineImpl);
        this.fnAdapter = fnAdapter;
    }

    @Nonnull
    public StreamStage<T> addTimestamps() {
        return this.addTimestamps(o -> System.currentTimeMillis(), 0L);
    }

    @Nonnull
    public StreamStage<T> addTimestamps(DistributedToLongFunction<? super T> timestampFn, long allowedLateness) {
        Preconditions.checkFalse((boolean)this.hasJetEvents(), (String)"This stage already has timestamps assigned to it.");
        DistributedSupplier<WatermarkPolicy> wmPolicy = WatermarkPolicies.limitingLag(allowedLateness);
        WatermarkGenerationParams<? super T> wmParams = WatermarkGenerationParams.wmGenParams(timestampFn, JetEvent::jetEvent, wmPolicy, THROWING_EMIT_POLICY, 60000L);
        if (this.transform instanceof StreamSourceTransform) {
            ((StreamSourceTransform)this.transform).setWmGenerationParams(wmParams);
            this.fnAdapter = ADAPT_TO_JET_EVENT;
            return (StreamStage)((Object)this);
        }
        TimestampTransform tsTransform = new TimestampTransform(this.transform, wmParams);
        this.pipelineImpl.connect(this.transform, tsTransform);
        return new StreamStageImpl(tsTransform, ADAPT_TO_JET_EVENT, this.pipelineImpl);
    }

    @Nonnull
    <R, RET> RET attachMap(@Nonnull DistributedFunction<? super T, ? extends R> mapFn) {
        return this.attach(new MapTransform(this.transform, this.fnAdapter.adaptMapFn(mapFn)), this.fnAdapter);
    }

    @Nonnull
    <C, R, RET> RET attachMapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiFunction<? super C, ? super T, ? extends R> mapFn) {
        return this.attach(new MapUsingContextTransform(this.transform, contextFactory, this.fnAdapter.adaptMapUsingContextFn(mapFn)), this.fnAdapter);
    }

    @Nonnull
    <RET> RET attachFilter(@Nonnull DistributedPredicate<T> filterFn) {
        return this.attach(new FilterTransform(this.transform, this.fnAdapter.adaptFilterFn(filterFn)), this.fnAdapter);
    }

    @Nonnull
    <C, RET> RET attachFilterUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiPredicate<? super C, ? super T> filterFn) {
        return this.attach(new FilterUsingContextTransform<C, T>(this.transform, contextFactory, this.fnAdapter.adaptFilterUsingContextFn(filterFn)), this.fnAdapter);
    }

    @Nonnull
    <R, RET> RET attachFlatMap(@Nonnull DistributedFunction<? super T, ? extends Traverser<? extends R>> flatMapFn) {
        return this.attach(new FlatMapTransform(this.transform, this.fnAdapter.adaptFlatMapFn(flatMapFn)), this.fnAdapter);
    }

    @Nonnull
    <C, R, RET> RET attachFlatMapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiFunction<? super C, ? super T, ? extends Traverser<? extends R>> flatMapFn) {
        return this.attach(new FlatMapUsingContextTransform(this.transform, contextFactory, this.fnAdapter.adaptFlatMapUsingContextFn(flatMapFn)), this.fnAdapter);
    }

    @Nonnull
    <K1, T1_IN, T1, R, RET> RET attachHashJoin(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1, ? super T, ? super T1_IN, ? extends T1> joinClause, @Nonnull DistributedBiFunction<T, T1, R> mapToOutputFn) {
        return this.attach(new HashJoinTransform(Arrays.asList(this.transform, ComputeStageImplBase.transformOf(stage1)), Collections.singletonList(this.fnAdapter.adaptJoinClause(joinClause)), Collections.emptyList(), this.fnAdapter.adaptHashJoinOutputFn(mapToOutputFn)), this.fnAdapter);
    }

    @Nonnull
    <K1, T1_IN, T1, K2, T2_IN, T2, R, RET> RET attachHashJoin2(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1, ? super T, ? super T1_IN, ? extends T1> joinClause1, @Nonnull BatchStage<T2_IN> stage2, @Nonnull JoinClause<K2, ? super T, ? super T2_IN, ? extends T2> joinClause2, @Nonnull DistributedTriFunction<T, T1, T2, R> mapToOutputFn) {
        return this.attach(new HashJoinTransform<Object, Object>(Arrays.asList(this.transform, ComputeStageImplBase.transformOf(stage1), ComputeStageImplBase.transformOf(stage2)), Arrays.asList(this.fnAdapter.adaptJoinClause(joinClause1), this.fnAdapter.adaptJoinClause(joinClause2)), Collections.emptyList(), this.fnAdapter.adaptHashJoinOutputFn(mapToOutputFn)), this.fnAdapter);
    }

    @Nonnull
    <RET> RET attachPeek(@Nonnull DistributedPredicate<? super T> shouldLogFn, @Nonnull DistributedFunction<? super T, ? extends CharSequence> toStringFn) {
        return this.attach(new PeekTransform(this.transform, this.fnAdapter.adaptFilterFn(shouldLogFn), this.fnAdapter.adaptToStringFn(toStringFn)), this.fnAdapter);
    }

    @Nonnull
    <RET> RET attachCustomTransform(@Nonnull String stageName, @Nonnull DistributedSupplier<Processor> procSupplier) {
        return this.attach(new ProcessorTransform(this.transform, stageName, procSupplier), this.fnAdapter);
    }

    @Nonnull
    public SinkStage drainTo(@Nonnull Sink<? super T> sink) {
        SinkImpl sinkImpl = (SinkImpl)sink;
        SinkTransform sinkTransform = new SinkTransform(sinkImpl, this.transform, this.fnAdapter == ADAPT_TO_JET_EVENT);
        SinkStageImpl output = new SinkStageImpl(sinkTransform, this.pipelineImpl);
        sinkImpl.onAssignToStage();
        this.pipelineImpl.connect(this.transform, sinkTransform);
        return output;
    }

    @Nonnull
    abstract <RET> RET attach(@Nonnull AbstractTransform var1, @Nonnull FunctionAdapter var2);

    private boolean hasJetEvents() {
        return this.fnAdapter.equals(ADAPT_TO_JET_EVENT);
    }

    static void ensureJetEvents(@Nonnull ComputeStageImplBase stage, @Nonnull String name) {
        if (stage.fnAdapter != ADAPT_TO_JET_EVENT) {
            throw new IllegalStateException(name + " is missing a timestamp definition. Call one of the .addTimestamps() methods on it before performing the aggregation.");
        }
    }
}

