/*
 * Decompiled with CFR 0.152.
 */
package org.apache.htrace.impl;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.nio.charset.StandardCharsets;
import org.apache.htrace.core.Span;
import org.apache.htrace.impl.BufferManager;
import org.apache.htrace.impl.Conf;
import org.apache.htrace.impl.PackedBuffer;
import org.apache.htrace.impl.TimeUtil;
import org.apache.htrace.shaded.commons.logging.Log;
import org.apache.htrace.shaded.commons.logging.LogFactory;

class PackedBufferManager
implements BufferManager {
    private static final Log LOG = LogFactory.getLog(PackedBuffer.class);
    private static final int MAX_PREQUEL_LENGTH = 2048;
    private static final int METHOD_ID_WRITE_SPANS = 1;
    private final Conf conf;
    private final ByteBuffer frameBuffer;
    private final PackedBuffer prequel;
    private final PackedBuffer spans;
    private final Selector selector;
    private int numSpans;

    PackedBufferManager(Conf conf) throws IOException {
        this.conf = conf;
        this.frameBuffer = ByteBuffer.allocate(20);
        this.prequel = new PackedBuffer(ByteBuffer.allocate(2048));
        this.spans = new PackedBuffer(ByteBuffer.allocate(conf.bufferSize));
        this.selector = SelectorProvider.provider().openSelector();
        this.clear();
    }

    @Override
    public void writeSpan(Span span) throws IOException {
        this.spans.writeSpan(span);
        ++this.numSpans;
        if (LOG.isTraceEnabled()) {
            LOG.trace("wrote " + span.toJson() + " to PackedBuffer for " + this.conf.endpointStr + ". numSpans = " + this.numSpans + ", buffer position = " + this.spans.getBuffer().position());
        }
    }

    @Override
    public int contentLength() {
        return this.spans.getBuffer().position();
    }

    @Override
    public int getNumberOfSpans() {
        return this.numSpans;
    }

    @Override
    public void prepare() throws IOException {
        this.prequel.beginWriteSpansRequest(null, this.numSpans);
        long totalLength = this.prequel.getBuffer().position() + this.spans.getBuffer().position();
        if (totalLength > 0x4000000L) {
            throw new IOException("Can't send RPC of " + totalLength + " bytes " + "because it is longer than " + 0x4000000);
        }
        PackedBuffer.writeReqFrame(this.frameBuffer, 1, 1L, (int)totalLength);
        this.frameBuffer.flip();
        this.prequel.getBuffer().flip();
        this.spans.getBuffer().flip();
        if (LOG.isTraceEnabled()) {
            LOG.trace("Preparing to send RPC of length " + (totalLength + 20L) + " to " + this.conf.endpointStr + ", containing " + this.numSpans + " spans.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void flush() throws IOException {
        SelectionKey sockKey = null;
        IOException ioe = null;
        this.frameBuffer.position(0);
        this.prequel.getBuffer().position(0);
        this.spans.getBuffer().position(0);
        if (LOG.isTraceEnabled()) {
            LOG.trace("Preparing to flush " + this.numSpans + " spans to " + this.conf.endpointStr);
        }
        try {
            sockKey = this.doConnect();
            this.doSend(sockKey, new ByteBuffer[]{this.frameBuffer, this.prequel.getBuffer(), this.spans.getBuffer()});
            ByteBuffer response = this.prequel.getBuffer();
            this.readAndValidateResponseFrame(sockKey, response, 1L, 1);
        }
        catch (IOException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Got exception during flush", e);
            }
            ioe = e;
        }
        finally {
            block17: {
                if (sockKey != null) {
                    sockKey.cancel();
                    try {
                        SocketChannel sock = (SocketChannel)sockKey.attachment();
                        sock.close();
                    }
                    catch (IOException e) {
                        if (ioe == null) break block17;
                        ioe.addSuppressed(e);
                    }
                }
            }
        }
        if (ioe != null) {
            throw ioe;
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Successfully flushed " + this.numSpans + " spans to " + this.conf.endpointStr);
        }
    }

    private long updateRemainingMs(long startMs, long timeoutMs) {
        long deltaMs = TimeUtil.deltaMs(startMs, TimeUtil.nowMs());
        if (deltaMs > timeoutMs) {
            return 0L;
        }
        return timeoutMs - deltaMs;
    }

    private SelectionKey doConnect() throws IOException {
        SocketChannel sock = SocketChannel.open();
        SelectionKey sockKey = null;
        boolean success = false;
        try {
            if (sock.isBlocking()) {
                sock.configureBlocking(false);
            }
            InetSocketAddress resolvedEndpoint = new InetSocketAddress(this.conf.endpoint.getHostString(), this.conf.endpoint.getPort());
            resolvedEndpoint.getHostName();
            sock.connect(resolvedEndpoint);
            sockKey = sock.register(this.selector, 8, sock);
            long startMs = TimeUtil.nowMs();
            long remainingMs = this.conf.connectTimeoutMs;
            do {
                this.selector.select(remainingMs);
                for (SelectionKey key : this.selector.keys()) {
                    if (!key.isConnectable()) continue;
                    SocketChannel s = (SocketChannel)key.attachment();
                    s.finishConnect();
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Successfully connected to " + this.conf.endpointStr + ".");
                    }
                    success = true;
                    SelectionKey selectionKey = sockKey;
                    return selectionKey;
                }
            } while ((remainingMs = this.updateRemainingMs(startMs, this.conf.connectTimeoutMs)) != 0L);
            throw new IOException("Attempt to connect to " + this.conf.endpointStr + " timed out after " + TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) + " ms.");
        }
        finally {
            if (!success) {
                if (sockKey != null) {
                    sockKey.cancel();
                }
                sock.close();
            }
        }
    }

    private void doSend(SelectionKey sockKey, ByteBuffer[] bufs) throws IOException {
        long totalWritten = 0L;
        sockKey.interestOps(4);
        SocketChannel sock = (SocketChannel)sockKey.attachment();
        long startMs = TimeUtil.nowMs();
        long remainingMs = this.conf.ioTimeoutMs;
        block0: do {
            this.selector.select(remainingMs);
            int firstBuf = 0;
            for (SelectionKey key : this.selector.selectedKeys()) {
                if (!key.isWritable()) continue;
                long written = sock.write(bufs, firstBuf, bufs.length - firstBuf);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Sent " + written + " bytes to " + this.conf.endpointStr);
                }
                totalWritten += written;
            }
            while (true) {
                if (firstBuf == bufs.length) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Finished sending " + totalWritten + " bytes to " + this.conf.endpointStr);
                    }
                    return;
                }
                if (bufs[firstBuf].remaining() > 0) continue block0;
                ++firstBuf;
            }
        } while ((remainingMs = this.updateRemainingMs(startMs, this.conf.ioTimeoutMs)) != 0L);
        throw new IOException("Attempt to write to " + this.conf.endpointStr + " timed out after " + TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) + " ms.");
    }

    private void doRecv(SelectionKey sockKey, ByteBuffer response) throws IOException {
        sockKey.interestOps(1);
        SocketChannel sock = (SocketChannel)sockKey.attachment();
        int totalRead = response.remaining();
        long startMs = TimeUtil.nowMs();
        long remainingMs = this.conf.ioTimeoutMs;
        while (remainingMs > 0L) {
            this.selector.select(remainingMs);
            for (SelectionKey key : this.selector.selectedKeys()) {
                if (!key.isReadable()) continue;
                sock.read(response);
            }
            if (response.remaining() == 0) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Received all " + totalRead + " bytes from " + this.conf.endpointStr);
                }
                return;
            }
            remainingMs = this.updateRemainingMs(startMs, this.conf.ioTimeoutMs);
            if (LOG.isTraceEnabled()) {
                LOG.trace("Received " + (totalRead - response.remaining()) + " out of " + totalRead + " bytes from " + this.conf.endpointStr);
            }
            if (remainingMs != 0L) continue;
            throw new IOException("Attempt to write to " + this.conf.endpointStr + " timed out after " + TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) + " ms.");
        }
    }

    private void readAndValidateResponseFrame(SelectionKey sockKey, ByteBuffer buf, long expectedSeq, int expectedMethodId) throws IOException {
        buf.clear();
        buf.limit(20);
        this.doRecv(sockKey, buf);
        buf.flip();
        buf.order(ByteOrder.LITTLE_ENDIAN);
        long seq = buf.getLong();
        if (seq != expectedSeq) {
            throw new IOException("Expected sequence number " + expectedSeq + ", but got sequence number " + seq);
        }
        int methodId = buf.getInt();
        if (expectedMethodId != methodId) {
            throw new IOException("Expected method id " + expectedMethodId + ", but got " + methodId);
        }
        int errorLength = buf.getInt();
        buf.getInt();
        if (errorLength < 0 || errorLength > 0x400000) {
            throw new IOException("Got server error with invalid length " + errorLength);
        }
        if (errorLength > 0) {
            buf.clear();
            buf.limit(errorLength);
            this.doRecv(sockKey, buf);
            buf.flip();
            CharBuffer charBuf = StandardCharsets.UTF_8.decode(buf);
            String serverErrorStr = charBuf.toString();
            throw new IOException("Got server error " + serverErrorStr);
        }
    }

    @Override
    public void clear() {
        this.frameBuffer.clear();
        this.prequel.getBuffer().clear();
        this.spans.getBuffer().clear();
        this.numSpans = 0;
    }

    @Override
    public void close() {
        this.clear();
        this.prequel.close();
        this.spans.close();
        try {
            this.selector.close();
        }
        catch (IOException e) {
            LOG.warn("Error closing selector", e);
        }
    }
}

