/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.samza.runtime.FutureCollector;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.runners.samza.runtime.OpEmitter;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.samza.operators.Scheduler;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BundleManager<@UnknownKeyFor OutT> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(BundleManager.class);
    private static final @UnknownKeyFor @NonNull @Initialized long MIN_BUNDLE_CHECK_TIME_MS = 10L;
    private final @UnknownKeyFor @NonNull @Initialized long maxBundleSize;
    private final @UnknownKeyFor @NonNull @Initialized long maxBundleTimeMs;
    private final @UnknownKeyFor @NonNull @Initialized BundleProgressListener<OutT> bundleProgressListener;
    private final @UnknownKeyFor @NonNull @Initialized FutureCollector<OutT> futureCollector;
    private final @UnknownKeyFor @NonNull @Initialized Scheduler<@UnknownKeyFor @NonNull @Initialized KeyedTimerData<@UnknownKeyFor @org.checkerframework.checker.nullness.qual.Nullable @Initialized Void>> bundleTimerScheduler;
    private final @UnknownKeyFor @NonNull @Initialized String bundleCheckTimerId;
    private transient @UnknownKeyFor @NonNull @Initialized AtomicLong currentBundleElementCount;
    private transient @UnknownKeyFor @NonNull @Initialized AtomicLong pendingBundleCount;
    private transient @UnknownKeyFor @NonNull @Initialized AtomicLong bundleStartTime;
    private transient @UnknownKeyFor @NonNull @Initialized AtomicBoolean isBundleStarted;
    private transient @UnknownKeyFor @NonNull @Initialized Instant bundleWatermarkHold;
    private transient @UnknownKeyFor @NonNull @Initialized AtomicReference<@UnknownKeyFor @NonNull @Initialized CompletableFuture<@UnknownKeyFor @org.checkerframework.checker.nullness.qual.Nullable @Initialized Void>> currentActiveBundleDoneFutureReference;
    private transient @UnknownKeyFor @NonNull @Initialized CompletionStage<@UnknownKeyFor @org.checkerframework.checker.nullness.qual.Nullable @Initialized Void> watermarkFuture;

    public BundleManager(@UnknownKeyFor @NonNull @Initialized BundleProgressListener<OutT> bundleProgressListener, @UnknownKeyFor @NonNull @Initialized FutureCollector<OutT> futureCollector, @UnknownKeyFor @NonNull @Initialized long maxBundleSize, @UnknownKeyFor @NonNull @Initialized long maxBundleTimeMs, @UnknownKeyFor @NonNull @Initialized Scheduler<@UnknownKeyFor @NonNull @Initialized KeyedTimerData<@UnknownKeyFor @org.checkerframework.checker.nullness.qual.Nullable @Initialized Void>> bundleTimerScheduler, @UnknownKeyFor @NonNull @Initialized String bundleCheckTimerId) {
        this.maxBundleSize = maxBundleSize;
        this.maxBundleTimeMs = maxBundleTimeMs;
        this.bundleProgressListener = bundleProgressListener;
        this.bundleTimerScheduler = bundleTimerScheduler;
        this.bundleCheckTimerId = bundleCheckTimerId;
        this.futureCollector = futureCollector;
        if (maxBundleSize > 1L) {
            this.scheduleNextBundleCheck();
        }
        this.bundleStartTime = new AtomicLong(Long.MAX_VALUE);
        this.currentActiveBundleDoneFutureReference = new AtomicReference();
        this.currentBundleElementCount = new AtomicLong(0L);
        this.isBundleStarted = new AtomicBoolean(false);
        this.pendingBundleCount = new AtomicLong(0L);
        this.watermarkFuture = CompletableFuture.completedFuture(null);
    }

    private void scheduleNextBundleCheck() {
        Instant nextBundleCheckTime = Instant.now().plus((ReadableDuration)Duration.millis((long)(this.maxBundleTimeMs / 2L + 10L)));
        TimerInternals.TimerData timerData = TimerInternals.TimerData.of((String)this.bundleCheckTimerId, (StateNamespace)StateNamespaces.global(), (Instant)nextBundleCheckTime, (Instant)nextBundleCheckTime, (TimeDomain)TimeDomain.PROCESSING_TIME);
        this.bundleTimerScheduler.schedule(new KeyedTimerData<Object>(new byte[0], null, timerData), nextBundleCheckTime.getMillis());
    }

    void tryStartBundle() {
        this.futureCollector.prepare();
        if (this.isBundleStarted.compareAndSet(false, true)) {
            LOG.debug("Starting a new bundle.");
            Preconditions.checkArgument((this.currentActiveBundleDoneFutureReference.get() == null ? 1 : 0) != 0, (Object)"Current active bundle done future should be null before starting a new bundle.");
            this.bundleStartTime.set(System.currentTimeMillis());
            this.pendingBundleCount.incrementAndGet();
            this.bundleProgressListener.onBundleStarted();
        }
        this.currentBundleElementCount.incrementAndGet();
    }

    void processWatermark(@UnknownKeyFor @NonNull @Initialized Instant watermark, @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter) {
        if (!this.isBundleStarted() && this.pendingBundleCount.get() == 0L) {
            LOG.debug("Propagating watermark: {} directly since no bundle in progress.", (Object)watermark);
            this.bundleProgressListener.onWatermark(watermark, emitter);
            return;
        }
        this.bundleWatermarkHold = watermark;
        if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals((Object)watermark)) {
            if (this.isBundleStarted()) {
                LOG.info("Received max watermark. Triggering finish bundle before flushing the watermark downstream.");
                this.tryFinishBundle(emitter);
                this.watermarkFuture.toCompletableFuture().join();
            } else {
                LOG.info("Received max watermark. Waiting for previous bundles to complete before flushing the watermark downstream.");
                this.watermarkFuture.toCompletableFuture().join();
                this.bundleProgressListener.onWatermark(watermark, emitter);
            }
        }
    }

    void processTimer(@UnknownKeyFor @NonNull @Initialized KeyedTimerData<@UnknownKeyFor @org.checkerframework.checker.nullness.qual.Nullable @Initialized Void> keyedTimerData, @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter) {
        if (this.bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) {
            this.tryFinishBundle(emitter);
            this.scheduleNextBundleCheck();
        }
    }

    void signalFailure(@UnknownKeyFor @NonNull @Initialized Throwable t) {
        LOG.error("Encountered error during processing the message. Discarding the output due to: ", t);
        this.futureCollector.discard();
        this.isBundleStarted.compareAndSet(true, false);
        if (this.bundleStartTime.get() != Long.MAX_VALUE) {
            this.currentBundleElementCount.set(0L);
            this.bundleStartTime.set(Long.MAX_VALUE);
            this.pendingBundleCount.decrementAndGet();
            this.currentActiveBundleDoneFutureReference.set(null);
        }
    }

    void tryFinishBundle(@UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter) {
        CompletionStage<Collection<WindowedValue<OutT>>> outputFuture = this.futureCollector.finish();
        if (this.shouldFinishBundle() && this.isBundleStarted.compareAndSet(true, false)) {
            LOG.debug("Finishing the current bundle.");
            this.currentBundleElementCount.set(0L);
            this.bundleStartTime.set(Long.MAX_VALUE);
            Instant watermarkHold = this.bundleWatermarkHold;
            this.bundleWatermarkHold = null;
            CompletableFuture<Object> currentActiveBundleDoneFuture = (CompletableFuture<Object>)this.currentActiveBundleDoneFutureReference.get();
            outputFuture = outputFuture.thenCombine(currentActiveBundleDoneFuture != null ? currentActiveBundleDoneFuture : CompletableFuture.completedFuture(null), (res, ignored) -> {
                this.bundleProgressListener.onBundleFinished(emitter);
                return res;
            });
            BiConsumer<Collection, Void> watermarkPropagationFn = watermarkHold == null ? (ignored, res) -> this.pendingBundleCount.decrementAndGet() : (ignored, res) -> {
                LOG.debug("Propagating watermark: {} to downstream.", (Object)watermarkHold);
                this.bundleProgressListener.onWatermark(watermarkHold, emitter);
                this.pendingBundleCount.decrementAndGet();
            };
            this.watermarkFuture = outputFuture.thenAcceptBoth(this.watermarkFuture, watermarkPropagationFn);
            this.currentActiveBundleDoneFutureReference.set(null);
        } else if (this.isBundleStarted.get()) {
            CompletableFuture finalOutputFuture = outputFuture.toCompletableFuture();
            this.currentActiveBundleDoneFutureReference.updateAndGet(maybePrevFuture -> {
                CompletableFuture<Object> prevFuture = maybePrevFuture != null ? maybePrevFuture : CompletableFuture.completedFuture(null);
                return CompletableFuture.allOf(prevFuture, finalOutputFuture);
            });
        }
        emitter.emitFuture(outputFuture);
    }

    @VisibleForTesting
    @UnknownKeyFor @NonNull @Initialized long getCurrentBundleElementCount() {
        return this.currentBundleElementCount.longValue();
    }

    @Nullable
    @VisibleForTesting
    @UnknownKeyFor @org.checkerframework.checker.nullness.qual.Nullable @Initialized CompletionStage<@UnknownKeyFor @org.checkerframework.checker.nullness.qual.Nullable @Initialized Void> getCurrentBundleDoneFuture() {
        return this.currentActiveBundleDoneFutureReference.get();
    }

    @VisibleForTesting
    void setCurrentBundleDoneFuture(@UnknownKeyFor @NonNull @Initialized CompletableFuture<@UnknownKeyFor @org.checkerframework.checker.nullness.qual.Nullable @Initialized Void> currentBundleResultFuture) {
        this.currentActiveBundleDoneFutureReference.set(currentBundleResultFuture);
    }

    @VisibleForTesting
    @UnknownKeyFor @NonNull @Initialized long getPendingBundleCount() {
        return this.pendingBundleCount.longValue();
    }

    @VisibleForTesting
    void setPendingBundleCount(@UnknownKeyFor @NonNull @Initialized long value) {
        this.pendingBundleCount.set(value);
    }

    @VisibleForTesting
    @UnknownKeyFor @NonNull @Initialized boolean isBundleStarted() {
        return this.isBundleStarted.get();
    }

    @VisibleForTesting
    void setBundleWatermarkHold(@UnknownKeyFor @NonNull @Initialized Instant watermark) {
        this.bundleWatermarkHold = watermark;
    }

    private @UnknownKeyFor @NonNull @Initialized boolean shouldFinishBundle() {
        return this.isBundleStarted.get() && (this.currentBundleElementCount.get() >= this.maxBundleSize || System.currentTimeMillis() - this.bundleStartTime.get() >= this.maxBundleTimeMs || BoundedWindow.TIMESTAMP_MAX_VALUE.equals((Object)this.bundleWatermarkHold));
    }

    public static interface BundleProgressListener<@UnknownKeyFor OutT> {
        public void onBundleStarted();

        public void onBundleFinished(@UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> var1);

        public void onWatermark(@UnknownKeyFor @NonNull @Initialized Instant var1, @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> var2);
    }
}

