/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.keepalive;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.rsocket.frame.KeepAliveFrameCodec;
import io.rsocket.keepalive.KeepAliveFramesAcceptor;
import io.rsocket.resume.ResumeStateHolder;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public abstract class KeepAliveSupport
implements KeepAliveFramesAcceptor {
    final ByteBufAllocator allocator;
    final Scheduler scheduler;
    final Duration keepAliveInterval;
    final Duration keepAliveTimeout;
    final long keepAliveTimeoutMillis;
    final AtomicBoolean started = new AtomicBoolean();
    volatile Consumer<KeepAlive> onTimeout;
    volatile Consumer<ByteBuf> onFrameSent;
    volatile Disposable ticksDisposable;
    volatile ResumeStateHolder resumeStateHolder;
    volatile long lastReceivedMillis;

    private KeepAliveSupport(ByteBufAllocator allocator, int keepAliveInterval, int keepAliveTimeout) {
        this.allocator = allocator;
        this.scheduler = Schedulers.parallel();
        this.keepAliveInterval = Duration.ofMillis(keepAliveInterval);
        this.keepAliveTimeout = Duration.ofMillis(keepAliveTimeout);
        this.keepAliveTimeoutMillis = keepAliveTimeout;
    }

    public KeepAliveSupport start() {
        this.lastReceivedMillis = this.scheduler.now(TimeUnit.MILLISECONDS);
        if (this.started.compareAndSet(false, true)) {
            this.ticksDisposable = Flux.interval((Duration)this.keepAliveInterval, (Scheduler)this.scheduler).subscribe(v -> this.onIntervalTick());
        }
        return this;
    }

    public void stop() {
        if (this.started.compareAndSet(true, false)) {
            this.ticksDisposable.dispose();
        }
    }

    @Override
    public void receive(ByteBuf keepAliveFrame) {
        this.lastReceivedMillis = this.scheduler.now(TimeUnit.MILLISECONDS);
        if (this.resumeStateHolder != null) {
            long remoteLastReceivedPos = this.remoteLastReceivedPosition(keepAliveFrame);
            this.resumeStateHolder.onImpliedPosition(remoteLastReceivedPos);
        }
        if (KeepAliveFrameCodec.respondFlag(keepAliveFrame)) {
            long localLastReceivedPos = this.localLastReceivedPosition();
            this.send(KeepAliveFrameCodec.encode(this.allocator, false, localLastReceivedPos, KeepAliveFrameCodec.data(keepAliveFrame).retain()));
        }
    }

    public KeepAliveSupport resumeState(ResumeStateHolder resumeStateHolder) {
        this.resumeStateHolder = resumeStateHolder;
        return this;
    }

    public KeepAliveSupport onSendKeepAliveFrame(Consumer<ByteBuf> onFrameSent) {
        this.onFrameSent = onFrameSent;
        return this;
    }

    public KeepAliveSupport onTimeout(Consumer<KeepAlive> onTimeout) {
        this.onTimeout = onTimeout;
        return this;
    }

    abstract void onIntervalTick();

    void send(ByteBuf frame) {
        if (this.onFrameSent != null) {
            this.onFrameSent.accept(frame);
        }
    }

    void tryTimeout() {
        long now = this.scheduler.now(TimeUnit.MILLISECONDS);
        if (now - this.lastReceivedMillis >= this.keepAliveTimeoutMillis) {
            if (this.onTimeout != null) {
                this.onTimeout.accept(new KeepAlive(this.keepAliveInterval, this.keepAliveTimeout));
            }
            this.stop();
        }
    }

    long localLastReceivedPosition() {
        return this.resumeStateHolder != null ? this.resumeStateHolder.impliedPosition() : 0L;
    }

    long remoteLastReceivedPosition(ByteBuf keepAliveFrame) {
        return KeepAliveFrameCodec.lastPosition(keepAliveFrame);
    }

    public void dispose() {
        this.stop();
    }

    public boolean isDisposed() {
        return this.ticksDisposable.isDisposed();
    }

    public static final class KeepAlive {
        private final Duration tickPeriod;
        private final Duration timeoutMillis;

        public KeepAlive(Duration tickPeriod, Duration timeoutMillis) {
            this.tickPeriod = tickPeriod;
            this.timeoutMillis = timeoutMillis;
        }

        public Duration getTickPeriod() {
            return this.tickPeriod;
        }

        public Duration getTimeout() {
            return this.timeoutMillis;
        }
    }

    public static final class ClientKeepAliveSupport
    extends KeepAliveSupport {
        public ClientKeepAliveSupport(ByteBufAllocator allocator, int keepAliveInterval, int keepAliveTimeout) {
            super(allocator, keepAliveInterval, keepAliveTimeout);
        }

        @Override
        void onIntervalTick() {
            this.tryTimeout();
            this.send(KeepAliveFrameCodec.encode(this.allocator, true, this.localLastReceivedPosition(), Unpooled.EMPTY_BUFFER));
        }
    }
}

