package org.opennms.nephron;

import java.util.Optional;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.opennms.nephron.Pipeline;
import org.opennms.netmgt.flows.persistence.model.FlowDocument;

/* loaded from: input_file:org/opennms/nephron/FlowTimestampPolicy.class */
public class FlowTimestampPolicy extends TimestampPolicy<String, FlowDocument> {
    private final Duration maxDelay;
    private Instant maxEventTimestamp;

    public FlowTimestampPolicy(Duration duration, Optional<Instant> optional) {
        this.maxDelay = duration;
        this.maxEventTimestamp = optional.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE).plus(duration);
    }

    public Instant getTimestampForRecord(TimestampPolicy.PartitionContext partitionContext, KafkaRecord<String, FlowDocument> kafkaRecord) {
        return getTimestampForRecord(partitionContext, kafkaRecord, Instant.now());
    }

    @VisibleForTesting
    public Instant getTimestampForRecord(TimestampPolicy.PartitionContext partitionContext, KafkaRecord<String, FlowDocument> kafkaRecord, Instant instant) {
        Instant timestamp = Pipeline.ReadFromKafka.getTimestamp((FlowDocument) kafkaRecord.getKV().getValue());
        if (timestamp.isAfter(this.maxEventTimestamp) && timestamp.isBefore(instant)) {
            this.maxEventTimestamp = timestamp;
        }
        return timestamp;
    }

    public Instant getWatermark(TimestampPolicy.PartitionContext partitionContext) {
        return this.maxEventTimestamp.minus(this.maxDelay);
    }
}
