/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.store.ha;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.ha.HAService;

public class HAConnection {
    private static final InternalLogger log = InternalLoggerFactory.getLogger((String)"RocketmqStore");
    private final HAService haService;
    private final SocketChannel socketChannel;
    private final String clientAddr;
    private WriteSocketService writeSocketService;
    private ReadSocketService readSocketService;
    private volatile long slaveRequestOffset = -1L;
    private volatile long slaveAckOffset = -1L;

    public HAConnection(HAService haService, SocketChannel socketChannel) throws IOException {
        this.haService = haService;
        this.socketChannel = socketChannel;
        this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
        this.socketChannel.configureBlocking(false);
        this.socketChannel.socket().setSoLinger(false, -1);
        this.socketChannel.socket().setTcpNoDelay(true);
        this.socketChannel.socket().setReceiveBufferSize(65536);
        this.socketChannel.socket().setSendBufferSize(65536);
        this.writeSocketService = new WriteSocketService(this.socketChannel);
        this.readSocketService = new ReadSocketService(this.socketChannel);
        this.haService.getConnectionCount().incrementAndGet();
    }

    public void start() {
        this.readSocketService.start();
        this.writeSocketService.start();
    }

    public void shutdown() {
        this.writeSocketService.shutdown(true);
        this.readSocketService.shutdown(true);
        this.close();
    }

    public void close() {
        if (this.socketChannel != null) {
            try {
                this.socketChannel.close();
            }
            catch (IOException e) {
                log.error("", (Throwable)e);
            }
        }
    }

    public SocketChannel getSocketChannel() {
        return this.socketChannel;
    }

    class WriteSocketService
    extends ServiceThread {
        private final Selector selector;
        private final SocketChannel socketChannel;
        private final int headerSize = 12;
        private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(12);
        private long nextTransferFromWhere = -1L;
        private SelectMappedBufferResult selectMappedBufferResult;
        private boolean lastWriteOver = true;
        private long lastWriteTimestamp = System.currentTimeMillis();

        public WriteSocketService(SocketChannel socketChannel) throws IOException {
            this.selector = RemotingUtil.openSelector();
            this.socketChannel = socketChannel;
            this.socketChannel.register(this.selector, 4);
            this.setDaemon(true);
        }

        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                try {
                    SelectMappedBufferResult selectResult;
                    this.selector.select(1000L);
                    if (-1L == HAConnection.this.slaveRequestOffset) {
                        Thread.sleep(10L);
                        continue;
                    }
                    if (-1L == this.nextTransferFromWhere) {
                        if (0L == HAConnection.this.slaveRequestOffset) {
                            long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                            if ((masterOffset -= masterOffset % (long)HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMappedFileSizeCommitLog()) < 0L) {
                                masterOffset = 0L;
                            }
                            this.nextTransferFromWhere = masterOffset;
                        } else {
                            this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
                        }
                        log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr + "], and slave request " + HAConnection.this.slaveRequestOffset);
                    }
                    if (this.lastWriteOver) {
                        long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                        if (interval > (long)HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) {
                            this.byteBufferHeader.position(0);
                            this.byteBufferHeader.limit(12);
                            this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                            this.byteBufferHeader.putInt(0);
                            this.byteBufferHeader.flip();
                            this.lastWriteOver = this.transferData();
                            if (!this.lastWriteOver) {
                                continue;
                            }
                        }
                    } else {
                        this.lastWriteOver = this.transferData();
                        if (!this.lastWriteOver) continue;
                    }
                    if ((selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere)) != null) {
                        int size = selectResult.getSize();
                        if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                            size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                        }
                        long thisOffset = this.nextTransferFromWhere;
                        this.nextTransferFromWhere += (long)size;
                        selectResult.getByteBuffer().limit(size);
                        this.selectMappedBufferResult = selectResult;
                        this.byteBufferHeader.position(0);
                        this.byteBufferHeader.limit(12);
                        this.byteBufferHeader.putLong(thisOffset);
                        this.byteBufferHeader.putInt(size);
                        this.byteBufferHeader.flip();
                        this.lastWriteOver = this.transferData();
                        continue;
                    }
                    HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100L);
                }
                catch (Exception e) {
                    log.error(this.getServiceName() + " service has exception.", (Throwable)e);
                    break;
                }
            }
            HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();
            if (this.selectMappedBufferResult != null) {
                this.selectMappedBufferResult.release();
            }
            this.makeStop();
            HAConnection.this.readSocketService.makeStop();
            HAConnection.this.haService.removeConnection(HAConnection.this);
            SelectionKey sk = this.socketChannel.keyFor(this.selector);
            if (sk != null) {
                sk.cancel();
            }
            try {
                this.selector.close();
                this.socketChannel.close();
            }
            catch (IOException e) {
                log.error("", (Throwable)e);
            }
            log.info(this.getServiceName() + " service end");
        }

        private boolean transferData() throws Exception {
            boolean result;
            int writeSize;
            int writeSizeZeroTimes = 0;
            while (this.byteBufferHeader.hasRemaining()) {
                writeSize = this.socketChannel.write(this.byteBufferHeader);
                if (writeSize > 0) {
                    writeSizeZeroTimes = 0;
                    this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                    continue;
                }
                if (writeSize == 0) {
                    if (++writeSizeZeroTimes < 3) continue;
                    break;
                }
                throw new Exception("ha master write header error < 0");
            }
            if (null == this.selectMappedBufferResult) {
                return !this.byteBufferHeader.hasRemaining();
            }
            writeSizeZeroTimes = 0;
            if (!this.byteBufferHeader.hasRemaining()) {
                while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                    writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
                    if (writeSize > 0) {
                        writeSizeZeroTimes = 0;
                        this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                        continue;
                    }
                    if (writeSize == 0) {
                        if (++writeSizeZeroTimes < 3) continue;
                        break;
                    }
                    throw new Exception("ha master write body error < 0");
                }
            }
            boolean bl = result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
            if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                this.selectMappedBufferResult.release();
                this.selectMappedBufferResult = null;
            }
            return result;
        }

        public String getServiceName() {
            return WriteSocketService.class.getSimpleName();
        }

        public void shutdown() {
            super.shutdown();
        }
    }

    class ReadSocketService
    extends ServiceThread {
        private static final int READ_MAX_BUFFER_SIZE = 0x100000;
        private final Selector selector;
        private final SocketChannel socketChannel;
        private final ByteBuffer byteBufferRead = ByteBuffer.allocate(0x100000);
        private int processPosition = 0;
        private volatile long lastReadTimestamp = System.currentTimeMillis();

        public ReadSocketService(SocketChannel socketChannel) throws IOException {
            this.selector = RemotingUtil.openSelector();
            this.socketChannel = socketChannel;
            this.socketChannel.register(this.selector, 1);
            this.setDaemon(true);
        }

        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                try {
                    this.selector.select(1000L);
                    boolean ok = this.processReadEvent();
                    if (!ok) {
                        log.error("processReadEvent error");
                        break;
                    }
                    long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
                    if (interval <= (long)HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) continue;
                    log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
                    break;
                }
                catch (Exception e) {
                    log.error(this.getServiceName() + " service has exception.", (Throwable)e);
                    break;
                }
            }
            this.makeStop();
            HAConnection.this.writeSocketService.makeStop();
            HAConnection.this.haService.removeConnection(HAConnection.this);
            HAConnection.this.haService.getConnectionCount().decrementAndGet();
            SelectionKey sk = this.socketChannel.keyFor(this.selector);
            if (sk != null) {
                sk.cancel();
            }
            try {
                this.selector.close();
                this.socketChannel.close();
            }
            catch (IOException e) {
                log.error("", (Throwable)e);
            }
            log.info(this.getServiceName() + " service end");
        }

        public String getServiceName() {
            return ReadSocketService.class.getSimpleName();
        }

        private boolean processReadEvent() {
            int readSizeZeroTimes = 0;
            if (!this.byteBufferRead.hasRemaining()) {
                this.byteBufferRead.flip();
                this.processPosition = 0;
            }
            while (this.byteBufferRead.hasRemaining()) {
                try {
                    int readSize = this.socketChannel.read(this.byteBufferRead);
                    if (readSize > 0) {
                        readSizeZeroTimes = 0;
                        this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                        if (this.byteBufferRead.position() - this.processPosition < 8) continue;
                        int pos = this.byteBufferRead.position() - this.byteBufferRead.position() % 8;
                        long readOffset = this.byteBufferRead.getLong(pos - 8);
                        this.processPosition = pos;
                        HAConnection.this.slaveAckOffset = readOffset;
                        if (HAConnection.this.slaveRequestOffset < 0L) {
                            HAConnection.this.slaveRequestOffset = readOffset;
                            log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                        }
                        HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                        continue;
                    }
                    if (readSize == 0) {
                        if (++readSizeZeroTimes < 3) continue;
                        break;
                    }
                    log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                    return false;
                }
                catch (IOException e) {
                    log.error("processReadEvent exception", (Throwable)e);
                    return false;
                }
            }
            return true;
        }
    }
}

