/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.io.Serializable;
import java.util.Optional;
import org.apache.beam.sdk.io.kafka.CustomTimestampPolicyWithLimitedDelay;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaTimestampType;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.kafka.common.TopicPartition;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

@FunctionalInterface
public interface TimestampPolicyFactory<@UnknownKeyFor KeyT, @UnknownKeyFor ValueT>
extends Serializable {
    public @UnknownKeyFor @NonNull @Initialized TimestampPolicy<KeyT, ValueT> createTimestampPolicy(@UnknownKeyFor @NonNull @Initialized TopicPartition var1, @UnknownKeyFor @NonNull @Initialized Optional<@UnknownKeyFor @NonNull @Initialized Instant> var2);

    public static <K, V> @UnknownKeyFor @NonNull @Initialized TimestampPolicyFactory<K, V> withProcessingTime() {
        return (tp, prev) -> new ProcessingTimePolicy();
    }

    public static <K, V> @UnknownKeyFor @NonNull @Initialized TimestampPolicyFactory<K, V> withLogAppendTime() {
        return (tp, previousWatermark) -> new LogAppendTimePolicy(previousWatermark);
    }

    public static <K, V> @UnknownKeyFor @NonNull @Initialized TimestampPolicyFactory<K, V> withCreateTime(@UnknownKeyFor @NonNull @Initialized Duration maxDelay) {
        SerializableFunction & Serializable timestampFunction = (SerializableFunction & Serializable)record -> {
            Preconditions.checkArgument((record.getTimestampType() == KafkaTimestampType.CREATE_TIME ? 1 : 0) != 0, (String)"Kafka record's timestamp is not 'CREATE_TIME' (topic: %s, partition %s, offset %s, timestamp type '%s')", (Object)record.getTopic(), (Object)record.getPartition(), (Object)record.getOffset(), (Object)((Object)record.getTimestampType()));
            return new Instant(record.getTimestamp());
        };
        return (tp, previousWatermark) -> new CustomTimestampPolicyWithLimitedDelay(timestampFunction, maxDelay, previousWatermark);
    }

    @Deprecated
    public static <K, V> @UnknownKeyFor @NonNull @Initialized TimestampPolicyFactory<K, V> withTimestampFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> timestampFn) {
        return (tp, previousWatermark) -> new TimestampFnPolicy(timestampFn, previousWatermark);
    }

    public static class TimestampFnPolicy<@UnknownKeyFor K, @UnknownKeyFor V>
    extends TimestampPolicy<K, V> {
        final @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> timestampFn;
        @UnknownKeyFor @NonNull @Initialized Instant lastRecordTimestamp;

        TimestampFnPolicy(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> timestampFn, @UnknownKeyFor @NonNull @Initialized Optional<@UnknownKeyFor @NonNull @Initialized Instant> previousWatermark) {
            this.timestampFn = timestampFn;
            this.lastRecordTimestamp = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized Instant getTimestampForRecord(@UnknownKeyFor @NonNull @Initialized TimestampPolicy.PartitionContext context, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V> record) {
            this.lastRecordTimestamp = (Instant)this.timestampFn.apply(record);
            return this.lastRecordTimestamp;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized Instant getWatermark(@UnknownKeyFor @NonNull @Initialized TimestampPolicy.PartitionContext context) {
            return this.lastRecordTimestamp;
        }
    }

    public static class LogAppendTimePolicy<@UnknownKeyFor K, @UnknownKeyFor V>
    extends TimestampPolicy<K, V> {
        private static final @UnknownKeyFor @NonNull @Initialized Duration IDLE_WATERMARK_DELTA = Duration.standardSeconds((long)2L);
        protected @UnknownKeyFor @NonNull @Initialized Instant currentWatermark;

        public LogAppendTimePolicy(@UnknownKeyFor @NonNull @Initialized Optional<@UnknownKeyFor @NonNull @Initialized Instant> previousWatermark) {
            this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized Instant getTimestampForRecord(@UnknownKeyFor @NonNull @Initialized TimestampPolicy.PartitionContext context, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V> record) {
            if (record.getTimestampType().equals((Object)KafkaTimestampType.LOG_APPEND_TIME)) {
                this.currentWatermark = new Instant(record.getTimestamp());
            } else if (this.currentWatermark.equals((Object)BoundedWindow.TIMESTAMP_MIN_VALUE)) {
                throw new IllegalStateException(String.format("LogAppendTimePolicy policy is enabled in reader, but Kafka record's timestamp type is LogAppendTime. Most likely it is not enabled on Kafka for the topic '%s'. Actual timestamp type is '%s'.", new Object[]{record.getTopic(), record.getTimestampType()}));
            }
            return this.currentWatermark;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized Instant getWatermark(@UnknownKeyFor @NonNull @Initialized TimestampPolicy.PartitionContext context) {
            Instant idleWatermark;
            if (context.getMessageBacklog() == 0L && (idleWatermark = context.getBacklogCheckTime().minus((ReadableDuration)IDLE_WATERMARK_DELTA)).isAfter((ReadableInstant)this.currentWatermark)) {
                this.currentWatermark = idleWatermark;
            }
            return this.currentWatermark;
        }
    }

    public static class ProcessingTimePolicy<@UnknownKeyFor K, @UnknownKeyFor V>
    extends TimestampPolicy<K, V> {
        @Override
        public @UnknownKeyFor @NonNull @Initialized Instant getTimestampForRecord(@UnknownKeyFor @NonNull @Initialized TimestampPolicy.PartitionContext context, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V> record) {
            return Instant.now();
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized Instant getWatermark(@UnknownKeyFor @NonNull @Initialized TimestampPolicy.PartitionContext context) {
            return Instant.now();
        }
    }
}

