/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.samza.runtime.BundleManager;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.runners.samza.runtime.OpEmitter;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.samza.operators.Scheduler;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PortableBundleManager<@UnknownKeyFor OutT>
implements BundleManager<OutT> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(PortableBundleManager.class);
    private static final @UnknownKeyFor @NonNull @Initialized long MIN_BUNDLE_CHECK_TIME_MS = 10L;
    private final @UnknownKeyFor @NonNull @Initialized long maxBundleSize;
    private final @UnknownKeyFor @NonNull @Initialized long maxBundleTimeMs;
    private final @UnknownKeyFor @NonNull @Initialized BundleManager.BundleProgressListener<OutT> bundleProgressListener;
    private final @UnknownKeyFor @NonNull @Initialized Scheduler<@UnknownKeyFor @NonNull @Initialized KeyedTimerData<@UnknownKeyFor @Nullable @Initialized Void>> bundleTimerScheduler;
    private final @UnknownKeyFor @NonNull @Initialized String bundleCheckTimerId;
    private @UnknownKeyFor @NonNull @Initialized AtomicLong currentBundleElementCount;
    private @UnknownKeyFor @NonNull @Initialized AtomicLong pendingBundleCount;
    private @UnknownKeyFor @NonNull @Initialized AtomicLong bundleStartTime;
    private @UnknownKeyFor @NonNull @Initialized AtomicBoolean isBundleStarted;
    private volatile @UnknownKeyFor @NonNull @Initialized Instant bundleWatermarkHold;

    public PortableBundleManager(@UnknownKeyFor @NonNull @Initialized BundleManager.BundleProgressListener<OutT> bundleProgressListener, @UnknownKeyFor @NonNull @Initialized long maxBundleSize, @UnknownKeyFor @NonNull @Initialized long maxBundleTimeMs, @UnknownKeyFor @NonNull @Initialized Scheduler<@UnknownKeyFor @NonNull @Initialized KeyedTimerData<@UnknownKeyFor @Nullable @Initialized Void>> bundleTimerScheduler, @UnknownKeyFor @NonNull @Initialized String bundleCheckTimerId) {
        this.maxBundleSize = maxBundleSize;
        this.maxBundleTimeMs = maxBundleTimeMs;
        this.bundleProgressListener = bundleProgressListener;
        this.bundleTimerScheduler = bundleTimerScheduler;
        this.bundleCheckTimerId = bundleCheckTimerId;
        if (maxBundleSize > 1L) {
            this.scheduleNextBundleCheck();
        }
        this.bundleStartTime = new AtomicLong(Long.MAX_VALUE);
        this.currentBundleElementCount = new AtomicLong(0L);
        this.isBundleStarted = new AtomicBoolean(false);
        this.pendingBundleCount = new AtomicLong(0L);
    }

    private void scheduleNextBundleCheck() {
        Instant nextBundleCheckTime = Instant.now().plus((ReadableDuration)Duration.millis((long)(this.maxBundleTimeMs / 2L + 10L)));
        TimerInternals.TimerData timerData = TimerInternals.TimerData.of((String)this.bundleCheckTimerId, (StateNamespace)StateNamespaces.global(), (Instant)nextBundleCheckTime, (Instant)nextBundleCheckTime, (TimeDomain)TimeDomain.PROCESSING_TIME);
        this.bundleTimerScheduler.schedule(new KeyedTimerData<Object>(new byte[0], null, timerData), nextBundleCheckTime.getMillis());
    }

    @Override
    public void tryStartBundle() {
        this.inconsistentStateCheck();
        LOG.debug("tryStartBundle: elementCount={}, Bundle={}", (Object)this.currentBundleElementCount, (Object)this.toString());
        if (this.isBundleStarted.compareAndSet(false, true)) {
            LOG.debug("Starting a new bundle.");
            this.bundleStartTime.set(System.currentTimeMillis());
            this.pendingBundleCount.getAndIncrement();
            this.bundleProgressListener.onBundleStarted();
        }
        this.currentBundleElementCount.incrementAndGet();
    }

    @Override
    public void processWatermark(@UnknownKeyFor @NonNull @Initialized Instant watermark, @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter) {
        if (this.shouldProcessWatermark()) {
            LOG.debug("Propagating watermark: {} directly since no bundle in progress.", (Object)watermark);
            this.bundleProgressListener.onWatermark(watermark, emitter);
            return;
        }
        this.bundleWatermarkHold = watermark;
    }

    @Override
    public void processTimer(@UnknownKeyFor @NonNull @Initialized KeyedTimerData<@UnknownKeyFor @Nullable @Initialized Void> keyedTimerData, @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter) {
        this.inconsistentStateCheck();
        if (this.bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) {
            this.tryFinishBundle(emitter);
            this.scheduleNextBundleCheck();
        }
    }

    @Override
    public void signalFailure(@UnknownKeyFor @NonNull @Initialized Throwable t) {
        this.inconsistentStateCheck();
        LOG.error("Encountered error during processing the message. Discarding the output due to: ", t);
        this.isBundleStarted.set(false);
        this.currentBundleElementCount.set(0L);
        this.bundleStartTime.set(Long.MAX_VALUE);
        this.pendingBundleCount.decrementAndGet();
    }

    @Override
    public void tryFinishBundle(@UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter) {
        LOG.debug("tryFinishBundle: elementCount={}", (Object)this.currentBundleElementCount);
        this.inconsistentStateCheck();
        if (this.shouldFinishBundle() && this.isBundleStarted.compareAndSet(true, false)) {
            LOG.debug("Finishing the current bundle. Bundle={}", (Object)this);
            this.currentBundleElementCount.set(0L);
            this.bundleStartTime.set(Long.MAX_VALUE);
            Instant watermarkHold = this.bundleWatermarkHold;
            this.bundleWatermarkHold = null;
            this.pendingBundleCount.decrementAndGet();
            this.bundleProgressListener.onBundleFinished(emitter);
            if (watermarkHold != null) {
                this.bundleProgressListener.onWatermark(watermarkHold, emitter);
            }
        }
    }

    public void inconsistentStateCheck() {
        if (!this.isBundleStarted.get() && this.currentBundleElementCount.get() != 0L) {
            LOG.warn("Bundle is in a inconsistent state. isBundleStarted = false, but currentBundleElementCount = {}", (Object)this.currentBundleElementCount);
        }
    }

    private @UnknownKeyFor @NonNull @Initialized boolean shouldProcessWatermark() {
        return !this.isBundleStarted.get() && this.pendingBundleCount.get() == 0L;
    }

    private @UnknownKeyFor @NonNull @Initialized boolean shouldFinishBundle() {
        return this.currentBundleElementCount.get() >= this.maxBundleSize || System.currentTimeMillis() - this.bundleStartTime.get() >= this.maxBundleTimeMs || BoundedWindow.TIMESTAMP_MAX_VALUE.equals((Object)this.bundleWatermarkHold);
    }
}

