/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.decentred.server;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesIn;
import net.openhft.chronicle.bytes.BytesOut;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.decentred.api.MessageToListener;
import net.openhft.chronicle.decentred.dto.SignedMessage;
import net.openhft.chronicle.decentred.dto.VanillaSignedMessage;
import net.openhft.chronicle.decentred.server.RunningMessageToListener;
import net.openhft.chronicle.threads.LongPauser;
import net.openhft.chronicle.threads.Pauser;

public class SingleMessageToListener
implements RunningMessageToListener,
Runnable {
    private final Pauser pauser = new LongPauser(0, 10, 1L, 20L, TimeUnit.MILLISECONDS);
    private final MessageToListener xclServer;
    private final AtomicReference<Bytes> writeLock = new AtomicReference();
    private final VanillaSignedMessage signedMessage = new VanillaSignedMessage();
    private final Bytes bytes1 = Bytes.allocateElasticDirect((long)0x2000000L).unchecked(true);
    private final Bytes bytes2 = Bytes.allocateElasticDirect((long)0x2000000L).unchecked(true);

    public SingleMessageToListener(MessageToListener xclServer) {
        this.xclServer = xclServer;
        this.writeLock.set(this.bytes1);
    }

    @Override
    public Runnable[] runnables() {
        return new Runnable[]{this};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onMessageTo(long address, SignedMessage message) {
        Bytes bytes = this.lock();
        try {
            long position = bytes.writePosition();
            bytes.ensureCapacity(position + 65536L);
            bytes.writeInt(0);
            bytes.writeLong(address);
            message.writeMarshallable((BytesOut)bytes);
            bytes.writeInt(position, (int)(bytes.writePosition() - position - 4L));
        }
        finally {
            this.unlock(bytes);
        }
        this.pauser.unpause();
    }

    private Bytes lock() {
        return this.writeLock.getAndSet(null);
    }

    private void unlock(Bytes bytes) {
        this.writeLock.set(bytes);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean flush() {
        Bytes other;
        Bytes bytes = this.writeLock.get();
        if (bytes == null) {
            return false;
        }
        Bytes bytes2 = other = bytes == this.bytes1 ? this.bytes2 : this.bytes1;
        if (!this.writeLock.compareAndSet(bytes, other)) {
            return false;
        }
        if (bytes.writePosition() == 0L) {
            return false;
        }
        long limit = bytes.readLimit();
        while (bytes.readRemaining() > 0L) {
            int size = bytes.readInt();
            long end = bytes.readPosition() + (long)size;
            bytes.readLimit(end);
            try {
                long address = bytes.readLong();
                this.signedMessage.readMarshallable((BytesIn)bytes);
                this.xclServer.onMessageTo(address, this.signedMessage);
            }
            finally {
                bytes.readPosition(end);
                bytes.readLimit(limit);
            }
        }
        bytes.clear();
        return true;
    }

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                if (this.flush()) {
                    this.pauser.reset();
                    continue;
                }
                this.pauser.pause();
            }
        }
        catch (Throwable t) {
            Jvm.warn().on(this.getClass(), "Writer died", t);
        }
    }
}

