/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.impl.StreamConsumer;
import com.rabbitmq.stream.impl.StreamConsumerBuilder;
import com.rabbitmq.stream.impl.StreamEnvironment;
import com.rabbitmq.stream.impl.Utils;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongConsumer;

class OffsetTrackingCoordinator {
    private final StreamEnvironment streamEnvironment;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final Collection<Tracker> trackers = ConcurrentHashMap.newKeySet();
    private final LocalClock clock = new LocalClock();
    private final AtomicBoolean flushingOnGoing = new AtomicBoolean(false);
    private final Duration checkInterval;
    private volatile Future<?> checkFuture;

    OffsetTrackingCoordinator(StreamEnvironment streamEnvironment) {
        this(streamEnvironment, Duration.ofSeconds(1L));
    }

    OffsetTrackingCoordinator(StreamEnvironment streamEnvironment, Duration checkInterval) {
        this.streamEnvironment = streamEnvironment;
        this.checkInterval = checkInterval;
    }

    Registration registerTrackingConsumer(StreamConsumer consumer, StreamConsumerBuilder.TrackingConfiguration configuration) {
        Tracker tracker;
        if (!configuration.enabled()) {
            throw new IllegalArgumentException("Tracking must be enabled");
        }
        if (configuration.auto()) {
            tracker = new AutoTrackingTracker(consumer, configuration, this.clock);
        } else {
            if (configuration.manualCheckInterval().isZero()) {
                throw new IllegalArgumentException("There should be no registration if the check interval is 0");
            }
            tracker = new ManualTrackingTracker(consumer, configuration, this.clock);
        }
        this.trackers.add(tracker);
        if (this.started.compareAndSet(false, true)) {
            this.clock.setTime(System.nanoTime());
            this.checkFuture = this.executor().scheduleAtFixedRate(() -> {
                if (this.flushingOnGoing.compareAndSet(false, true)) {
                    try {
                        this.clock.setTime(System.nanoTime());
                        Iterator<Tracker> iterator = this.trackers.iterator();
                        while (iterator.hasNext()) {
                            if (Thread.currentThread().isInterrupted()) {
                                Thread.currentThread().interrupt();
                                break;
                            }
                            Tracker t = iterator.next();
                            if (t.consumer().isOpen()) {
                                t.flushIfNecessary();
                                continue;
                            }
                            iterator.remove();
                        }
                    }
                    finally {
                        this.flushingOnGoing.set(false);
                    }
                }
            }, this.checkInterval.toMillis(), this.checkInterval.toMillis(), TimeUnit.MILLISECONDS);
        }
        return new Registration(tracker.postProcessingCallback(), tracker.trackingCallback());
    }

    private ScheduledExecutorService executor() {
        return this.streamEnvironment.scheduledExecutorService();
    }

    public boolean needTrackingRegistration(StreamConsumerBuilder.TrackingConfiguration trackingConfiguration) {
        return trackingConfiguration.enabled() && (!trackingConfiguration.manual() || !Duration.ZERO.equals(trackingConfiguration.manualCheckInterval()));
    }

    void close() {
        if (this.checkFuture != null) {
            this.checkFuture.cancel(true);
        }
    }

    private static class LocalClock {
        private volatile long time;

        private LocalClock() {
        }

        long time() {
            return this.time;
        }

        public void setTime(long time) {
            this.time = time;
        }
    }

    private static final class ManualTrackingTracker
    implements Tracker {
        private final StreamConsumer consumer;
        private final LocalClock clock;
        private final long checkIntervalInNs;
        private volatile long lastRequestedOffset = 0L;
        private volatile long lastTrackingActivity = 0L;

        private ManualTrackingTracker(StreamConsumer consumer, StreamConsumerBuilder.TrackingConfiguration configuration, LocalClock clock) {
            this.consumer = consumer;
            this.clock = clock;
            this.checkIntervalInNs = configuration.manualCheckInterval().toNanos();
        }

        @Override
        public Consumer<MessageHandler.Context> postProcessingCallback() {
            return null;
        }

        @Override
        public void flushIfNecessary() {
            long lastStoredOffset;
            if (this.clock.time() - this.lastTrackingActivity > this.checkIntervalInNs && (lastStoredOffset = this.consumer.lastStoredOffset()) < this.lastRequestedOffset) {
                this.consumer.store(this.lastRequestedOffset);
                this.lastTrackingActivity = this.clock.time();
            }
        }

        @Override
        public StreamConsumer consumer() {
            return this.consumer;
        }

        @Override
        public LongConsumer trackingCallback() {
            return requestedOffset -> {
                this.lastRequestedOffset = requestedOffset;
                this.lastTrackingActivity = this.clock.time();
            };
        }
    }

    private static final class AutoTrackingTracker
    implements Tracker {
        private final StreamConsumer consumer;
        private final int messageCountBeforeStorage;
        private final long flushIntervalInNs;
        private final LocalClock clock;
        private volatile long count = 0L;
        private volatile long lastProcessedOffset = 0L;
        private volatile long lastTrackingActivity = 0L;

        private AutoTrackingTracker(StreamConsumer consumer, StreamConsumerBuilder.TrackingConfiguration configuration, LocalClock clock) {
            this.consumer = consumer;
            this.messageCountBeforeStorage = configuration.autoMessageCountBeforeStorage();
            this.flushIntervalInNs = configuration.autoFlushInterval().toNanos();
            this.clock = clock;
        }

        @Override
        public Consumer<MessageHandler.Context> postProcessingCallback() {
            return context -> {
                if (++this.count % (long)this.messageCountBeforeStorage == 0L) {
                    context.storeOffset();
                    this.lastTrackingActivity = this.clock.time();
                }
                this.lastProcessedOffset = context.offset();
            };
        }

        @Override
        public void flushIfNecessary() {
            long lastStoredOffset;
            if (this.count > 0L && this.clock.time() - this.lastTrackingActivity > this.flushIntervalInNs && (lastStoredOffset = this.consumer.lastStoredOffset()) < this.lastProcessedOffset) {
                this.consumer.store(this.lastProcessedOffset);
                this.lastTrackingActivity = this.clock.time();
            }
        }

        @Override
        public StreamConsumer consumer() {
            return this.consumer;
        }

        @Override
        public LongConsumer trackingCallback() {
            return Utils.NO_OP_LONG_CONSUMER;
        }
    }

    static class Registration {
        private final Consumer<MessageHandler.Context> postMessageProcessingCallback;
        private final LongConsumer trackingCallback;

        Registration(Consumer<MessageHandler.Context> postMessageProcessingCallback, LongConsumer trackingCallback) {
            this.postMessageProcessingCallback = postMessageProcessingCallback;
            this.trackingCallback = trackingCallback;
        }

        public Consumer<MessageHandler.Context> postMessageProcessingCallback() {
            return this.postMessageProcessingCallback;
        }

        public LongConsumer trackingCallback() {
            return this.trackingCallback;
        }
    }

    private static interface Tracker {
        public Consumer<MessageHandler.Context> postProcessingCallback();

        public void flushIfNecessary();

        public StreamConsumer consumer();

        public LongConsumer trackingCallback();
    }
}

