/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.pipeline;

import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.WatermarkEmissionPolicy;
import com.hazelcast.jet.core.WatermarkGenerationParams;
import com.hazelcast.jet.core.WatermarkPolicies;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.function.DistributedBiConsumer;
import com.hazelcast.jet.function.DistributedConsumer;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.impl.JetEvent;
import com.hazelcast.jet.impl.pipeline.transform.BatchSourceTransform;
import com.hazelcast.jet.impl.pipeline.transform.StreamSourceTransform;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.util.Preconditions;
import javax.annotation.Nonnull;

public final class SourceBuilder<S> {
    private final String mName;
    private final DistributedFunction<? super Processor.Context, ? extends S> mCreateFn;
    private DistributedConsumer<? super S> mDestroyFn = DistributedConsumer.noop();
    private int mPreferredLocalParallelism;

    private SourceBuilder(@Nonnull String name, @Nonnull DistributedFunction<? super Processor.Context, ? extends S> createFn) {
        this.mName = name;
        this.mCreateFn = createFn;
    }

    @Nonnull
    public static <S> Batch<Void> batch(@Nonnull String name, @Nonnull DistributedFunction<? super Processor.Context, ? extends S> createFn) {
        SourceBuilder<S> sourceBuilder = new SourceBuilder<S>(name, createFn);
        sourceBuilder.getClass();
        return sourceBuilder.new Batch<Void>();
    }

    @Nonnull
    public static <S> Stream<Void> stream(@Nonnull String name, @Nonnull DistributedFunction<? super Processor.Context, ? extends S> createFn) {
        SourceBuilder<S> sourceBuilder = new SourceBuilder<S>(name, createFn);
        sourceBuilder.getClass();
        return sourceBuilder.new Stream<Void>();
    }

    @Nonnull
    public static <S> TimestampedStream<Void> timestampedStream(@Nonnull String name, @Nonnull DistributedFunction<? super Processor.Context, ? extends S> createFn) {
        SourceBuilder<S> sourceBuilder = new SourceBuilder<S>(name, createFn);
        sourceBuilder.getClass();
        return sourceBuilder.new TimestampedStream<Void>();
    }

    public final class TimestampedStream<T>
    extends Base<T> {
        private DistributedBiConsumer<? super S, ? super TimestampedSourceBuffer<T>> fillBufferFn;
        private long maxLag;

        private TimestampedStream() {
        }

        @Nonnull
        public <T_NEW> TimestampedStream<T_NEW> fillBufferFn(@Nonnull DistributedBiConsumer<? super S, ? super TimestampedSourceBuffer<T_NEW>> fillBufferFn) {
            TimestampedStream newThis = this;
            newThis.fillBufferFn = fillBufferFn;
            return newThis;
        }

        @Override
        @Nonnull
        public TimestampedStream<T> destroyFn(@Nonnull DistributedConsumer<? super S> pDestroyFn) {
            return (TimestampedStream)super.destroyFn(pDestroyFn);
        }

        @Override
        @Nonnull
        public TimestampedStream<T> distributed(int preferredLocalParallelism) {
            return (TimestampedStream)super.distributed(preferredLocalParallelism);
        }

        @Nonnull
        public TimestampedStream<T> allowedLateness(long allowedLateness) {
            this.maxLag = allowedLateness;
            return this;
        }

        @Nonnull
        public StreamSource<T> build() {
            Preconditions.checkNotNull(this.fillBufferFn, (String)"fillBufferFn must be set");
            StreamSourceTransform<JetEvent> source = new StreamSourceTransform<JetEvent>(SourceBuilder.this.mName, WatermarkGenerationParams.wmGenParams(JetEvent::timestamp, (e, timestamp) -> e, WatermarkPolicies.limitingLag(this.maxLag), WatermarkEmissionPolicy.NULL_EMIT_POLICY, 60000L), wmParams -> SourceProcessors.convenientTimestampedSourceP(SourceBuilder.this.mCreateFn, this.fillBufferFn, wmParams, SourceBuilder.this.mDestroyFn, SourceBuilder.this.mPreferredLocalParallelism), true);
            return source;
        }
    }

    public final class Stream<T>
    extends BaseNoTimestamps<T> {
        private Stream() {
        }

        @Override
        @Nonnull
        public <T_NEW> Stream<T_NEW> fillBufferFn(@Nonnull DistributedBiConsumer<? super S, ? super SourceBuffer<T_NEW>> fillBufferFn) {
            return (Stream)super.fillBufferFn(fillBufferFn);
        }

        @Override
        @Nonnull
        public Stream<T> destroyFn(@Nonnull DistributedConsumer<? super S> pDestroyFn) {
            return (Stream)super.destroyFn(pDestroyFn);
        }

        @Override
        @Nonnull
        public Stream<T> distributed(int preferredLocalParallelism) {
            return (Stream)super.distributed(preferredLocalParallelism);
        }

        @Nonnull
        public StreamSource<T> build() {
            Preconditions.checkNotNull((Object)this.fillBufferFn, (String)"fillBufferFn must be non-null");
            return new StreamSourceTransform(SourceBuilder.this.mName, wmParams -> SourceProcessors.convenientSourceP(SourceBuilder.this.mCreateFn, this.fillBufferFn, SourceBuilder.this.mDestroyFn, SourceBuilder.this.mPreferredLocalParallelism), false);
        }
    }

    public final class Batch<T>
    extends BaseNoTimestamps<T> {
        private Batch() {
        }

        @Override
        @Nonnull
        public <T_NEW> Batch<T_NEW> fillBufferFn(@Nonnull DistributedBiConsumer<? super S, ? super SourceBuffer<T_NEW>> fillBufferFn) {
            return (Batch)super.fillBufferFn(fillBufferFn);
        }

        @Override
        @Nonnull
        public Batch<T> destroyFn(@Nonnull DistributedConsumer<? super S> destroyFn) {
            return (Batch)super.destroyFn(destroyFn);
        }

        @Override
        @Nonnull
        public Batch<T> distributed(int preferredLocalParallelism) {
            return (Batch)super.distributed(preferredLocalParallelism);
        }

        @Nonnull
        public BatchSource<T> build() {
            Preconditions.checkNotNull((Object)this.fillBufferFn, (String)"fillBufferFn must be non-null");
            return new BatchSourceTransform(SourceBuilder.this.mName, SourceProcessors.convenientSourceP(SourceBuilder.this.mCreateFn, this.fillBufferFn, SourceBuilder.this.mDestroyFn, SourceBuilder.this.mPreferredLocalParallelism));
        }
    }

    private abstract class BaseNoTimestamps<T>
    extends Base<T> {
        DistributedBiConsumer<? super S, ? super SourceBuffer<T>> fillBufferFn;

        private BaseNoTimestamps() {
        }

        @Nonnull
        public <T_NEW> BaseNoTimestamps<T_NEW> fillBufferFn(@Nonnull DistributedBiConsumer<? super S, ? super SourceBuffer<T_NEW>> fillBufferFn) {
            BaseNoTimestamps newThis = this;
            newThis.fillBufferFn = fillBufferFn;
            return newThis;
        }
    }

    private abstract class Base<T> {
        private Base() {
        }

        @Nonnull
        public Base<T> destroyFn(@Nonnull DistributedConsumer<? super S> destroyFn) {
            SourceBuilder.this.mDestroyFn = destroyFn;
            return this;
        }

        @Nonnull
        public Base<T> distributed(int preferredLocalParallelism) {
            Preconditions.checkPositive((int)preferredLocalParallelism, (String)"Preferred local parallelism must be positive");
            SourceBuilder.this.mPreferredLocalParallelism = preferredLocalParallelism;
            return this;
        }
    }

    public static interface TimestampedSourceBuffer<T>
    extends SourceBuffer<T> {
        public void add(@Nonnull T var1, long var2);

        @Override
        default public void add(@Nonnull T item) {
            this.add(item, System.currentTimeMillis());
        }
    }

    public static interface SourceBuffer<T> {
        public int size();

        public void close();

        public void add(@Nonnull T var1);
    }
}

