/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.util.Optional;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public class CustomTimestampPolicyWithLimitedDelay<@UnknownKeyFor K, @UnknownKeyFor V>
extends TimestampPolicy<K, V> {
    private final @UnknownKeyFor @NonNull @Initialized Duration maxDelay;
    private final @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> timestampFunction;
    private @UnknownKeyFor @NonNull @Initialized Instant maxEventTimestamp;

    public CustomTimestampPolicyWithLimitedDelay(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> timestampFunction, @UnknownKeyFor @NonNull @Initialized Duration maxDelay, @UnknownKeyFor @NonNull @Initialized Optional<@UnknownKeyFor @NonNull @Initialized Instant> previousWatermark) {
        this.maxDelay = maxDelay;
        this.timestampFunction = timestampFunction;
        this.maxEventTimestamp = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE).plus((ReadableDuration)maxDelay);
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized Instant getTimestampForRecord(@UnknownKeyFor @NonNull @Initialized TimestampPolicy.PartitionContext ctx, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V> record) {
        Instant ts = (Instant)this.timestampFunction.apply(record);
        if (ts.isAfter((ReadableInstant)this.maxEventTimestamp)) {
            this.maxEventTimestamp = ts;
        }
        return ts;
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized Instant getWatermark(@UnknownKeyFor @NonNull @Initialized TimestampPolicy.PartitionContext ctx) {
        Instant now = Instant.now();
        return this.getWatermark(ctx, now);
    }

    @VisibleForTesting
    @UnknownKeyFor @NonNull @Initialized Instant getWatermark(@UnknownKeyFor @NonNull @Initialized TimestampPolicy.PartitionContext ctx, @UnknownKeyFor @NonNull @Initialized Instant now) {
        if (this.maxEventTimestamp.isAfter((ReadableInstant)now)) {
            return now.minus((ReadableDuration)this.maxDelay);
        }
        if (ctx.getMessageBacklog() == 0L && ctx.getBacklogCheckTime().minus((ReadableDuration)this.maxDelay).isAfter((ReadableInstant)this.maxEventTimestamp) && this.maxEventTimestamp.getMillis() > 0L) {
            return ctx.getBacklogCheckTime().minus((ReadableDuration)this.maxDelay);
        }
        return this.maxEventTimestamp.minus((ReadableDuration)this.maxDelay);
    }
}

