/*
 * Decompiled with CFR 0.152.
 */
package com.solacesystems.jcsmp.protocol.nio.impl;

import com.solacesystems.common.util.StringUtil;
import com.solacesystems.jcsmp.JCSMPFatalErrorException;
import com.solacesystems.jcsmp.JCSMPSessionStats;
import com.solacesystems.jcsmp.protocol.WireMessage;
import com.solacesystems.jcsmp.protocol.impl.TcpClientChannel;
import com.solacesystems.jcsmp.protocol.nio.ReadIOHandler;
import com.solacesystems.jcsmp.protocol.nio.SubscriberWireMessageHandler;
import com.solacesystems.jcsmp.protocol.nio.WriteIOHandler;
import com.solacesystems.jcsmp.protocol.nio.impl.AbstractSMFReadHandler;
import com.solacesystems.jcsmp.protocol.nio.impl.NioSmfUtil;
import com.solacesystems.jcsmp.protocol.smf.SMFWireMessageHandler;
import com.solacesystems.jcsmp.protocol.smf.impl.WireMessageFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.spi.AbstractSelectableChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class SubscriberMessageReader
extends AbstractSMFReadHandler
implements ReadIOHandler,
WriteIOHandler {
    private volatile ByteChannel _sc;
    private volatile SubscriberWireMessageHandler _subCallback;
    private Object _subCallbackLock;
    private SMFWireMessageHandler _smfWireMessageHandler;
    private final boolean _genRxTimestamps;
    private static final Log log = LogFactory.getLog(SubscriberMessageReader.class);

    public SubscriberMessageReader(ByteChannel byteChannel, SubscriberWireMessageHandler callbackHandler, boolean genRxTimestamps, JCSMPSessionStats sessionstats) {
        super(sessionstats);
        this._sc = byteChannel;
        this._subCallback = callbackHandler;
        this._subCallbackLock = new Object();
        this._smfWireMessageHandler = new SMFWireMessageHandler();
        this._genRxTimestamps = genRxTimestamps;
    }

    @Override
    boolean isReadComplete() throws IOException {
        return NioSmfUtil.isSmfMessageAvailableAtCurrentPos(this._inputBuf);
    }

    String getNetworkInfoString() {
        return this.getNetworkInfoString(this._subCallback);
    }

    String getNetworkInfoString(SubscriberWireMessageHandler handler) {
        String networkInfo = "";
        if (handler instanceof TcpClientChannel) {
            TcpClientChannel channel = (TcpClientChannel)handler;
            networkInfo = channel.getNetworkInfoString();
        }
        return networkInfo;
    }

    void logErrorBufferContents(ByteBuffer buf) {
        if (buf.hasArray()) {
            String fmtErrorBuffer = String.format("Invalid data found in buffer (%s) arrayOffset=%s %n", buf, buf.arrayOffset());
            try {
                byte[] data = new byte[buf.limit()];
                System.arraycopy(buf.array(), buf.arrayOffset(), data, 0, data.length);
                fmtErrorBuffer = fmtErrorBuffer + StringUtil.formatDumpBytes(data, true, 0);
                int MAXDUMPLEN = 200000;
                if (fmtErrorBuffer.length() > 200000) {
                    fmtErrorBuffer = fmtErrorBuffer.substring(0, 200000);
                }
            }
            catch (Throwable t) {
                fmtErrorBuffer = fmtErrorBuffer + "error dumping: " + t.toString();
            }
            log.error((Object)(this.getNetworkInfoString() + fmtErrorBuffer));
        }
    }

    @Override
    void processRead() throws IOException {
        long timestamp = this._genRxTimestamps ? System.currentTimeMillis() : 0L;
        try {
            do {
                WireMessage wm = WireMessageFactory.create();
                this._smfWireMessageHandler.readMessage(this._inputBuf, wm);
                wm.setRxTimestamp(timestamp);
                if (this._subCallback == null) continue;
                this._subCallback.handleMessage(wm);
            } while (this.isReadComplete());
        }
        catch (IOException e) {
            if (!e.getMessage().contains("Unable to decode SMF header optional parameters")) {
                log.error((Object)(this.getNetworkInfoString() + "Caught IOException in processRead (lost SMF framing), rethrowing: "), (Throwable)e);
                this.logErrorBufferContents(this._inputBuf);
                throw (IOException)new IOException(this.getNetworkInfoString() + e.getMessage()).initCause(e);
            }
            log.error((Object)(this.getNetworkInfoString() + "Caught IOException in readTLVsFast, trying to recover the parser:"), (Throwable)e);
            this.logErrorBufferContents(this._inputBuf);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void read() {
        try {
            Object object = this._inputBufLock;
            synchronized (object) {
                this.drain();
                int position = this._inputBuf.position();
                int limit = this._inputBuf.limit();
                this._inputBuf.flip();
                int avail = this._inputBuf.remaining();
                int reportedLen = 0;
                try {
                    reportedLen = NioSmfUtil.smfSizeReqAtCurrentPos(this._inputBuf);
                }
                catch (IOException e) {
                    log.error((Object)(this.getNetworkInfoString() + "Caught IOException in read (checking smf message size), rethrowing: "), (Throwable)e);
                    this.logErrorBufferContents(this._inputBuf);
                    throw e;
                }
                assert (this._inputBuf.position() == 0);
                if (avail >= reportedLen) {
                    this.processRead();
                    this._inputBuf.compact();
                } else if (reportedLen > this._inputBuf.capacity()) {
                    this.resizeBuffer((int)((double)reportedLen * 1.1));
                } else {
                    this._inputBuf.position(position);
                    this._inputBuf.limit(limit);
                }
            }
        }
        catch (IOException e) {
            SubscriberWireMessageHandler localSubCallback;
            log.info((Object)(this.getNetworkInfoString() + "SubscriberMessageReader:read Exception"), (Throwable)e);
            Object object = this._subCallbackLock;
            synchronized (object) {
                localSubCallback = this._subCallback;
            }
            if (localSubCallback != null) {
                localSubCallback.deregisterChannelToReactor();
                localSubCallback.handleException((IOException)new IOException(this.getNetworkInfoString(localSubCallback) + e.getMessage()).initCause(e));
                Object var2_5 = null;
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            log.warn((Object)(this.getNetworkInfoString() + "SubscriberMessageReader:read Exception"), (Throwable)e);
        }
    }

    @Override
    public AbstractSelectableChannel getChannel() {
        return (AbstractSelectableChannel)((Object)this._sc);
    }

    public void setChannel(ByteChannel sc) {
        this._sc = sc;
    }

    @Override
    public void write(SelectableChannel keyChannel) {
    }

    @Override
    public void notifyFatalError(JCSMPFatalErrorException ex) {
        if (this._subCallback != null) {
            this._subCallback.handleException((Exception)((Object)ex));
        } else {
            log.debug((Object)"Ignored JCSMPFatalErrorException, null subCallback on SubscriberMessageReader.");
        }
    }

    @Override
    public void handleClosedSocketDuringRead(CancelledKeyException cke, SelectableChannel sc) {
        if (sc == this._sc) {
            if (this._subCallback != null) {
                this._subCallback.handleException(new IOException(cke.getMessage()));
            }
        } else {
            log.debug((Object)"Caught closed socket notification in SubscriberMessageReader, ignoring: not for this channel.");
        }
    }

    @Override
    public void handleClosedSocketDuringWrite(CancelledKeyException cke, SelectableChannel sc) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void clearCallbackHandler() {
        Object object = this._subCallbackLock;
        synchronized (object) {
            this._subCallback = null;
        }
    }
}

