/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.keepalive;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.rsocket.frame.KeepAliveFrameFlyweight;
import io.rsocket.keepalive.KeepAliveFramesAcceptor;
import io.rsocket.resume.ResumeStateHolder;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

public abstract class KeepAliveSupport
implements KeepAliveFramesAcceptor {
    final ByteBufAllocator allocator;
    private final Duration keepAliveInterval;
    private final Duration keepAliveTimeout;
    private final long keepAliveTimeoutMillis;
    private volatile Consumer<KeepAlive> onTimeout;
    private volatile Consumer<ByteBuf> onFrameSent;
    private volatile Disposable ticksDisposable;
    private final AtomicBoolean started = new AtomicBoolean();
    private volatile ResumeStateHolder resumeStateHolder;
    private volatile long lastReceivedMillis;

    private KeepAliveSupport(ByteBufAllocator allocator, int keepAliveInterval, int keepAliveTimeout) {
        this.allocator = allocator;
        this.keepAliveInterval = Duration.ofMillis(keepAliveInterval);
        this.keepAliveTimeout = Duration.ofMillis(keepAliveTimeout);
        this.keepAliveTimeoutMillis = keepAliveTimeout;
    }

    public KeepAliveSupport start() {
        this.lastReceivedMillis = System.currentTimeMillis();
        if (this.started.compareAndSet(false, true)) {
            this.ticksDisposable = Flux.interval((Duration)this.keepAliveInterval).subscribe(v -> this.onIntervalTick());
        }
        return this;
    }

    public void stop() {
        if (this.started.compareAndSet(true, false)) {
            this.ticksDisposable.dispose();
        }
    }

    @Override
    public void receive(ByteBuf keepAliveFrame) {
        this.lastReceivedMillis = System.currentTimeMillis();
        if (this.resumeStateHolder != null) {
            long remoteLastReceivedPos = this.remoteLastReceivedPosition(keepAliveFrame);
            this.resumeStateHolder.onImpliedPosition(remoteLastReceivedPos);
        }
        if (KeepAliveFrameFlyweight.respondFlag(keepAliveFrame)) {
            long localLastReceivedPos = this.localLastReceivedPosition();
            this.send(KeepAliveFrameFlyweight.encode(this.allocator, false, localLastReceivedPos, KeepAliveFrameFlyweight.data(keepAliveFrame).retain()));
        }
    }

    public KeepAliveSupport resumeState(ResumeStateHolder resumeStateHolder) {
        this.resumeStateHolder = resumeStateHolder;
        return this;
    }

    public KeepAliveSupport onSendKeepAliveFrame(Consumer<ByteBuf> onFrameSent) {
        this.onFrameSent = onFrameSent;
        return this;
    }

    public KeepAliveSupport onTimeout(Consumer<KeepAlive> onTimeout) {
        this.onTimeout = onTimeout;
        return this;
    }

    abstract void onIntervalTick();

    void send(ByteBuf frame) {
        if (this.onFrameSent != null) {
            this.onFrameSent.accept(frame);
        }
    }

    void tryTimeout() {
        long now = System.currentTimeMillis();
        if (now - this.lastReceivedMillis >= this.keepAliveTimeoutMillis) {
            if (this.onTimeout != null) {
                this.onTimeout.accept(new KeepAlive(this.keepAliveInterval, this.keepAliveTimeout));
            }
            this.stop();
        }
    }

    long localLastReceivedPosition() {
        return this.resumeStateHolder != null ? this.resumeStateHolder.impliedPosition() : 0L;
    }

    long remoteLastReceivedPosition(ByteBuf keepAliveFrame) {
        return KeepAliveFrameFlyweight.lastPosition(keepAliveFrame);
    }

    public static final class KeepAlive {
        private final Duration tickPeriod;
        private final Duration timeoutMillis;

        public KeepAlive(Duration tickPeriod, Duration timeoutMillis) {
            this.tickPeriod = tickPeriod;
            this.timeoutMillis = timeoutMillis;
        }

        public Duration getTickPeriod() {
            return this.tickPeriod;
        }

        public Duration getTimeout() {
            return this.timeoutMillis;
        }
    }

    public static final class ClientKeepAliveSupport
    extends KeepAliveSupport {
        public ClientKeepAliveSupport(ByteBufAllocator allocator, int keepAliveInterval, int keepAliveTimeout) {
            super(allocator, keepAliveInterval, keepAliveTimeout);
        }

        @Override
        void onIntervalTick() {
            this.tryTimeout();
            this.send(KeepAliveFrameFlyweight.encode(this.allocator, true, this.localLastReceivedPosition(), Unpooled.EMPTY_BUFFER));
        }
    }

    public static final class ServerKeepAliveSupport
    extends KeepAliveSupport {
        public ServerKeepAliveSupport(ByteBufAllocator allocator, int keepAlivePeriod, int keepAliveTimeout) {
            super(allocator, keepAlivePeriod, keepAliveTimeout);
        }

        @Override
        void onIntervalTick() {
            this.tryTimeout();
        }
    }
}

