/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.keepalive;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.rsocket.frame.KeepAliveFrameFlyweight;
import io.rsocket.resume.ResumeStateHolder;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.UnicastProcessor;

abstract class KeepAliveHandler
implements Disposable {
    final ByteBufAllocator allocator;
    private final Duration keepAlivePeriod;
    private final long keepAliveTimeout;
    private volatile ResumeStateHolder resumeStateHolder;
    private final UnicastProcessor<ByteBuf> sent = UnicastProcessor.create();
    private final MonoProcessor<KeepAlive> timeout = MonoProcessor.create();
    private final AtomicReference<Disposable> intervalDisposable = new AtomicReference();
    private volatile long lastReceivedMillis;

    static KeepAliveHandler ofServer(ByteBufAllocator allocator, Duration keepAlivePeriod, Duration keepAliveTimeout) {
        return new Server(allocator, keepAlivePeriod, keepAliveTimeout);
    }

    static KeepAliveHandler ofClient(ByteBufAllocator allocator, Duration keepAlivePeriod, Duration keepAliveTimeout) {
        return new Client(allocator, keepAlivePeriod, keepAliveTimeout);
    }

    private KeepAliveHandler(ByteBufAllocator allocator, Duration keepAlivePeriod, Duration keepAliveTimeout) {
        this.allocator = allocator;
        this.keepAlivePeriod = keepAlivePeriod;
        this.keepAliveTimeout = keepAliveTimeout.toMillis();
    }

    public void start() {
        this.lastReceivedMillis = System.currentTimeMillis();
        this.intervalDisposable.compareAndSet(null, Flux.interval((Duration)this.keepAlivePeriod).subscribe(v -> this.onIntervalTick()));
    }

    public void dispose() {
        Disposable d = this.intervalDisposable.getAndSet(Disposables.disposed());
        if (d != null) {
            d.dispose();
        }
        this.sent.onComplete();
        this.timeout.onComplete();
    }

    public long receive(ByteBuf keepAliveFrame) {
        this.lastReceivedMillis = System.currentTimeMillis();
        long remoteLastReceivedPos = KeepAliveFrameFlyweight.lastPosition(keepAliveFrame);
        if (KeepAliveFrameFlyweight.respondFlag(keepAliveFrame)) {
            long localLastReceivedPos = this.obtainLastReceivedPos();
            this.doSend(KeepAliveFrameFlyweight.encode(this.allocator, false, localLastReceivedPos, KeepAliveFrameFlyweight.data(keepAliveFrame).retain()));
        }
        return remoteLastReceivedPos;
    }

    public void resumeState(ResumeStateHolder resumeStateHolder) {
        this.resumeStateHolder = resumeStateHolder;
    }

    public Flux<ByteBuf> send() {
        return this.sent;
    }

    public Mono<KeepAlive> timeout() {
        return this.timeout;
    }

    abstract void onIntervalTick();

    void doSend(ByteBuf frame) {
        this.sent.onNext((Object)frame);
    }

    void doCheckTimeout() {
        long now = System.currentTimeMillis();
        if (now - this.lastReceivedMillis >= this.keepAliveTimeout) {
            this.timeout.onNext((Object)new KeepAlive(this.keepAlivePeriod.toMillis(), this.keepAliveTimeout));
        }
    }

    long obtainLastReceivedPos() {
        return this.resumeStateHolder != null ? this.resumeStateHolder.impliedPosition() : 0L;
    }

    public static final class KeepAlive {
        private final long tickPeriod;
        private final long timeoutMillis;

        public KeepAlive(long tickPeriod, long timeoutMillis) {
            this.tickPeriod = tickPeriod;
            this.timeoutMillis = timeoutMillis;
        }

        public long getTickPeriod() {
            return this.tickPeriod;
        }

        public long getTimeoutMillis() {
            return this.timeoutMillis;
        }
    }

    private static final class Client
    extends KeepAliveHandler {
        Client(ByteBufAllocator allocator, Duration keepAlivePeriod, Duration keepAliveTimeout) {
            super(allocator, keepAlivePeriod, keepAliveTimeout);
        }

        @Override
        void onIntervalTick() {
            this.doCheckTimeout();
            this.doSend(KeepAliveFrameFlyweight.encode(this.allocator, true, this.obtainLastReceivedPos(), Unpooled.EMPTY_BUFFER));
        }
    }

    private static class Server
    extends KeepAliveHandler {
        Server(ByteBufAllocator allocator, Duration keepAlivePeriod, Duration keepAliveTimeout) {
            super(allocator, keepAlivePeriod, keepAliveTimeout);
        }

        @Override
        void onIntervalTick() {
            this.doCheckTimeout();
        }
    }
}

