/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.endpoint.dcp;

import com.couchbase.client.core.ResponseEvent;
import com.couchbase.client.core.endpoint.AbstractEndpoint;
import com.couchbase.client.core.endpoint.AbstractGenericHandler;
import com.couchbase.client.core.endpoint.ResponseStatusConverter;
import com.couchbase.client.core.endpoint.dcp.DCPConnection;
import com.couchbase.client.core.endpoint.kv.KeyValueStatus;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.CouchbaseResponse;
import com.couchbase.client.core.message.dcp.AbstractDCPRequest;
import com.couchbase.client.core.message.dcp.AbstractDCPResponse;
import com.couchbase.client.core.message.dcp.ControlParameter;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.FailoverLogEntry;
import com.couchbase.client.core.message.dcp.MutationMessage;
import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
import com.couchbase.client.core.message.dcp.RemoveMessage;
import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
import com.couchbase.client.core.message.dcp.StreamEndMessage;
import com.couchbase.client.core.message.dcp.StreamRequestRequest;
import com.couchbase.client.core.message.dcp.StreamRequestResponse;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.deps.com.lmax.disruptor.EventSink;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.DefaultFullBinaryMemcacheRequest;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.FullBinaryMemcacheRequest;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.FullBinaryMemcacheResponse;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import rx.functions.Action1;

public class DCPHandler
extends AbstractGenericHandler<FullBinaryMemcacheResponse, BinaryMemcacheRequest, DCPRequest> {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(DCPHandler.class);
    public static final byte OP_OPEN_CONNECTION = 80;
    public static final byte OP_STREAM_REQUEST = 83;
    public static final byte OP_STREAM_END = 85;
    public static final byte OP_SNAPSHOT_MARKER = 86;
    public static final byte OP_MUTATION = 87;
    public static final byte OP_REMOVE = 88;
    public static final byte OP_CONTROL = 94;
    public static final byte OP_BUFFER_ACK = 93;
    private final Map<String, DCPConnection> connections = new ConcurrentHashMap<String, DCPConnection>();

    public DCPHandler(AbstractEndpoint endpoint, EventSink<ResponseEvent> responseBuffer, boolean isTransient) {
        this(endpoint, responseBuffer, (Queue<DCPRequest>)new ArrayDeque<DCPRequest>(), isTransient);
    }

    public DCPHandler(AbstractEndpoint endpoint, EventSink<ResponseEvent> responseBuffer, Queue<DCPRequest> queue, boolean isTransient) {
        super(endpoint, responseBuffer, queue, isTransient);
    }

    @Override
    protected BinaryMemcacheRequest encodeRequest(ChannelHandlerContext ctx, DCPRequest msg) throws Exception {
        BinaryMemcacheRequest request;
        if (msg instanceof OpenConnectionRequest) {
            OpenConnectionRequest openConnection = (OpenConnectionRequest)msg;
            request = this.handleOpenConnectionRequest(ctx, openConnection);
            DCPConnection connection = new DCPConnection(this.env(), openConnection.connectionName(), openConnection.bucket());
            this.connections.put(connection.name(), connection);
        } else if (msg instanceof StreamRequestRequest) {
            request = this.handleStreamRequestRequest(ctx, (StreamRequestRequest)msg);
        } else {
            throw new IllegalArgumentException("Unknown incoming DCPRequest type " + msg.getClass());
        }
        if (msg.partition() >= 0) {
            request.setReserved(msg.partition());
        }
        return request;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected CouchbaseResponse decodeResponse(ChannelHandlerContext ctx, FullBinaryMemcacheResponse msg) throws Exception {
        DCPRequest request = (DCPRequest)this.currentRequest();
        AbstractDCPResponse response = null;
        if (msg.getOpcode() == 80 && request instanceof OpenConnectionRequest) {
            DCPConnection connection = this.connections.get(((OpenConnectionRequest)request).connectionName());
            response = new OpenConnectionResponse(ResponseStatusConverter.fromBinary(msg.getStatus()), connection, request);
            if (this.env().dcpConnectionBufferSize() > 0) {
                ctx.writeAndFlush(this.controlRequest(ctx, ControlParameter.CONNECTION_BUFFER_SIZE, this.env().dcpConnectionBufferSize()));
            }
        } else if (msg.getOpcode() == 83 && request instanceof StreamRequestRequest) {
            ByteBuf content = msg.content();
            ArrayList<FailoverLogEntry> failoverLog = null;
            long rollbackToSequenceNumber = 0L;
            KeyValueStatus status = KeyValueStatus.valueOf(msg.getStatus());
            switch (status) {
                case SUCCESS: {
                    failoverLog = new ArrayList<FailoverLogEntry>(content.readableBytes() / 16);
                    while (content.readableBytes() >= 16) {
                        FailoverLogEntry entry = new FailoverLogEntry(content.readLong(), content.readLong());
                        failoverLog.add(entry);
                    }
                    break;
                }
                case ERR_ROLLBACK: {
                    rollbackToSequenceNumber = content.readLong();
                    break;
                }
                default: {
                    LOGGER.warn("Unexpected status of StreamRequestResponse: {} (0x{}, {})", new Object[]{status, Integer.toHexString(status.code()), status.description()});
                }
            }
            DCPConnection connection = this.connections.get(DCPConnection.connectionName(msg.getOpaque()));
            response = new StreamRequestResponse(ResponseStatusConverter.fromBinary(msg.getStatus()), failoverLog, rollbackToSequenceNumber, request, connection);
        } else if (msg.getOpcode() == 94 || msg.getOpcode() == 93) {
            KeyValueStatus status = KeyValueStatus.valueOf(msg.getStatus());
            if (status != KeyValueStatus.SUCCESS) {
                LOGGER.warn("Unexpected status of service response (opcode={}): {} (0x{}, {})", new Object[]{Integer.toHexString(msg.getOpcode()), status, Integer.toHexString(status.code()), status.description()});
            }
        } else {
            DCPRequest oldRequest = (DCPRequest)this.currentRequest();
            String connectionName = DCPConnection.connectionName(msg.getOpaque());
            final DCPConnection connection = this.connections.get(connectionName);
            AbstractDCPRequest dummy = new AbstractDCPRequest(connection.bucket(), null){};
            dummy.observable().subscribe((Action1)new Action1<CouchbaseResponse>(){

                public void call(CouchbaseResponse couchbaseResponse) {
                }
            }, (Action1)new Action1<Throwable>(){

                public void call(Throwable throwable) {
                    connection.subject().onError(throwable);
                }
            });
            try {
                this.currentRequest(dummy);
                this.handleDCPRequest(ctx, connection, msg);
            }
            finally {
                this.currentRequest(oldRequest);
            }
        }
        if (request != null && request.partition() >= 0 && response != null) {
            response.partition(request.partition());
        }
        if (response != null || request == null) {
            this.finishedDecoding();
        }
        return response;
    }

    private void handleDCPRequest(ChannelHandlerContext ctx, DCPConnection connection, FullBinaryMemcacheResponse msg) {
        AbstractDCPRequest request = null;
        int flags = 0;
        switch (msg.getOpcode()) {
            case 86: {
                long startSequenceNumber = 0L;
                long endSequenceNumber = 0L;
                if (msg.getExtrasLength() > 0) {
                    ByteBuf extrasReleased = msg.getExtras();
                    ByteBuf extras = ctx.alloc().buffer(msg.getExtrasLength());
                    extras.writeBytes(extrasReleased, extrasReleased.readerIndex(), extrasReleased.readableBytes());
                    startSequenceNumber = extras.readLong();
                    endSequenceNumber = extras.readLong();
                    flags = extras.readInt();
                    extras.release();
                }
                request = new SnapshotMarkerMessage(msg.getStatus(), startSequenceNumber, endSequenceNumber, flags, connection.bucket());
                break;
            }
            case 87: {
                int expiration = 0;
                int lockTime = 0;
                if (msg.getExtrasLength() > 0) {
                    ByteBuf extrasReleased = msg.getExtras();
                    ByteBuf extras = ctx.alloc().buffer(msg.getExtrasLength());
                    extras.writeBytes(extrasReleased, extrasReleased.readerIndex(), extrasReleased.readableBytes());
                    extras.skipBytes(16);
                    flags = extras.readInt();
                    expiration = extras.readInt();
                    lockTime = extras.readInt();
                    extras.release();
                }
                request = new MutationMessage(msg.getStatus(), msg.getKey(), msg.content().retain(), expiration, flags, lockTime, msg.getCAS(), connection.bucket());
                break;
            }
            case 88: {
                request = new RemoveMessage(msg.getStatus(), msg.getKey(), msg.getCAS(), connection.bucket());
                break;
            }
            case 85: {
                ByteBuf extrasReleased = msg.getExtras();
                ByteBuf extras = ctx.alloc().buffer(msg.getExtrasLength());
                extras.writeBytes(extrasReleased, extrasReleased.readerIndex(), extrasReleased.readableBytes());
                flags = extras.readInt();
                extras.release();
                request = new StreamEndMessage(StreamEndMessage.Reason.valueOf(flags), connection.bucket());
                connection.removeStream(msg.getOpaque());
                break;
            }
            default: {
                LOGGER.info("Unhandled DCP message: {}, {}", (Object)msg.getOpcode(), (Object)msg);
            }
        }
        if (request != null) {
            connection.subject().onNext((Object)request);
        }
        this.updateConnectionStats(ctx, connection, msg);
        if (connection.streamsCount() == 0) {
            connection.subject().onCompleted();
            this.connections.remove(connection.name());
        }
    }

    private void updateConnectionStats(ChannelHandlerContext ctx, DCPConnection connection, FullBinaryMemcacheResponse msg) {
        connection.inc(msg.getTotalBodyLength());
        if ((double)connection.totalReceivedBytes() >= (double)this.env().dcpConnectionBufferSize() * this.env().dcpConnectionBufferAckThreshold()) {
            ctx.writeAndFlush(this.bufferAckRequest(ctx, connection.totalReceivedBytes()));
            connection.reset();
        }
    }

    private BinaryMemcacheRequest bufferAckRequest(ChannelHandlerContext ctx, int size) {
        ByteBuf extras = ctx.alloc().buffer(4).writeInt(size);
        DefaultBinaryMemcacheRequest request = new DefaultBinaryMemcacheRequest("", extras);
        request.setOpcode((byte)93);
        request.setExtrasLength((byte)extras.readableBytes());
        request.setTotalBodyLength(extras.readableBytes());
        return request;
    }

    private FullBinaryMemcacheRequest controlRequest(ChannelHandlerContext ctx, ControlParameter parameter, boolean value) {
        return this.controlRequest(ctx, parameter, Boolean.toString(value));
    }

    private FullBinaryMemcacheRequest controlRequest(ChannelHandlerContext ctx, ControlParameter parameter, int value) {
        return this.controlRequest(ctx, parameter, Integer.toString(value));
    }

    private FullBinaryMemcacheRequest controlRequest(ChannelHandlerContext ctx, ControlParameter parameter, String value) {
        String key = parameter.value();
        short keyLength = (short)key.getBytes(CharsetUtil.UTF_8).length;
        byte[] val = value.getBytes(CharsetUtil.UTF_8);
        ByteBuf body = ctx.alloc().buffer(val.length);
        body.writeBytes(val);
        DefaultFullBinaryMemcacheRequest request = new DefaultFullBinaryMemcacheRequest(key, Unpooled.EMPTY_BUFFER, body);
        request.setOpcode((byte)94);
        request.setKeyLength(keyLength);
        request.setTotalBodyLength(keyLength + body.readableBytes());
        return request;
    }

    private BinaryMemcacheRequest handleOpenConnectionRequest(ChannelHandlerContext ctx, OpenConnectionRequest msg) {
        ByteBuf extras = ctx.alloc().buffer(8);
        extras.writeInt(msg.sequenceNumber()).writeInt(msg.type().flags());
        String key = msg.connectionName();
        byte extrasLength = (byte)extras.readableBytes();
        short keyLength = (short)key.getBytes(CharsetUtil.UTF_8).length;
        DefaultBinaryMemcacheRequest request = new DefaultBinaryMemcacheRequest(key, extras);
        request.setOpcode((byte)80);
        request.setKeyLength(keyLength);
        request.setExtrasLength(extrasLength);
        request.setTotalBodyLength(keyLength + extrasLength);
        return request;
    }

    private BinaryMemcacheRequest handleStreamRequestRequest(ChannelHandlerContext ctx, StreamRequestRequest msg) {
        DCPConnection connection = this.connections.get(msg.connectionName());
        ByteBuf extras = ctx.alloc().buffer(48);
        extras.writeInt(0).writeInt(0).writeLong(msg.startSequenceNumber()).writeLong(msg.endSequenceNumber()).writeLong(msg.vbucketUUID()).writeLong(msg.snapshotStartSequenceNumber()).writeLong(msg.snapshotEndSequenceNumber());
        byte extrasLength = (byte)extras.readableBytes();
        DefaultBinaryMemcacheRequest request = new DefaultBinaryMemcacheRequest(extras);
        request.setOpcode((byte)83);
        request.setExtrasLength(extrasLength);
        request.setTotalBodyLength(extrasLength);
        request.setOpaque(connection.addStream(connection.name()));
        return request;
    }

    @Override
    protected ServiceType serviceType() {
        return ServiceType.DCP;
    }
}

