/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline;

import com.hazelcast.function.ToLongFunctionEx;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.WatermarkPolicy;
import com.hazelcast.jet.core.function.ObjLongBiFunction;
import com.hazelcast.jet.impl.JetEvent;
import com.hazelcast.jet.impl.pipeline.ComputeStageImplBase;
import com.hazelcast.jet.impl.pipeline.PipelineImpl;
import com.hazelcast.jet.impl.pipeline.StreamStageImpl;
import com.hazelcast.jet.impl.pipeline.transform.StreamSourceTransform;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.pipeline.StreamSourceStage;
import com.hazelcast.jet.pipeline.StreamStage;
import javax.annotation.Nonnull;

public class StreamSourceStageImpl<T>
implements StreamSourceStage<T> {
    private static final ObjLongBiFunction WRAP_TO_JET_EVENT = (item, ts) -> JetEvent.jetEvent(ts, item);
    private final StreamSourceTransform<T> transform;
    private final PipelineImpl pipeline;

    StreamSourceStageImpl(StreamSourceTransform<T> transform, PipelineImpl pipeline) {
        this.transform = transform;
        this.pipeline = pipeline;
    }

    @Override
    public StreamStage<T> withNativeTimestamps(long allowedLag) {
        Preconditions.checkTrue((boolean)this.transform.supportsNativeTimestamps(), (String)"The source doesn't support native timestamps");
        this.transform.setEventTimePolicy(EventTimePolicy.eventTimePolicy(null, this.wrapToJetEvent(), WatermarkPolicy.limitingLag(allowedLag), 0L, 0L, this.transform.partitionIdleTimeout()));
        return new StreamStageImpl(this.transform, ComputeStageImplBase.ADAPT_TO_JET_EVENT, this.pipeline);
    }

    @Override
    public StreamStage<T> withTimestamps(@Nonnull ToLongFunctionEx<? super T> timestampFn, long allowedLag) {
        Util.checkSerializable(timestampFn, "timestampFn");
        this.transform.setEventTimePolicy(EventTimePolicy.eventTimePolicy(timestampFn, this.wrapToJetEvent(), WatermarkPolicy.limitingLag(allowedLag), 0L, 0L, this.transform.partitionIdleTimeout()));
        return new StreamStageImpl(this.transform, ComputeStageImplBase.ADAPT_TO_JET_EVENT, this.pipeline);
    }

    @Override
    public StreamStage<T> withoutTimestamps() {
        return new StreamStageImpl(this.transform, this.transform.emitsJetEvents() ? ComputeStageImplBase.ADAPT_TO_JET_EVENT : ComputeStageImplBase.DO_NOT_ADAPT, this.pipeline);
    }

    private ObjLongBiFunction<T, JetEvent<T>> wrapToJetEvent() {
        return WRAP_TO_JET_EVENT;
    }
}

