/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.endpoint.config;

import com.couchbase.client.core.ResponseEvent;
import com.couchbase.client.core.endpoint.AbstractEndpoint;
import com.couchbase.client.core.endpoint.AbstractGenericHandler;
import com.couchbase.client.core.endpoint.ResponseStatusConverter;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.CouchbaseResponse;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.config.BucketConfigRequest;
import com.couchbase.client.core.message.config.BucketConfigResponse;
import com.couchbase.client.core.message.config.BucketStreamingRequest;
import com.couchbase.client.core.message.config.BucketStreamingResponse;
import com.couchbase.client.core.message.config.BucketsConfigRequest;
import com.couchbase.client.core.message.config.BucketsConfigResponse;
import com.couchbase.client.core.message.config.ClusterConfigRequest;
import com.couchbase.client.core.message.config.ClusterConfigResponse;
import com.couchbase.client.core.message.config.ConfigRequest;
import com.couchbase.client.core.message.config.FlushRequest;
import com.couchbase.client.core.message.config.FlushResponse;
import com.couchbase.client.core.message.config.GetDesignDocumentsRequest;
import com.couchbase.client.core.message.config.GetDesignDocumentsResponse;
import com.couchbase.client.core.message.config.InsertBucketRequest;
import com.couchbase.client.core.message.config.InsertBucketResponse;
import com.couchbase.client.core.message.config.RemoveBucketRequest;
import com.couchbase.client.core.message.config.RemoveBucketResponse;
import com.couchbase.client.core.message.config.UpdateBucketRequest;
import com.couchbase.client.core.message.config.UpdateBucketResponse;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.deps.com.lmax.disruptor.EventSink;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.deps.io.netty.handler.codec.http.DefaultFullHttpRequest;
import com.couchbase.client.deps.io.netty.handler.codec.http.HttpContent;
import com.couchbase.client.deps.io.netty.handler.codec.http.HttpMethod;
import com.couchbase.client.deps.io.netty.handler.codec.http.HttpObject;
import com.couchbase.client.deps.io.netty.handler.codec.http.HttpRequest;
import com.couchbase.client.deps.io.netty.handler.codec.http.HttpResponse;
import com.couchbase.client.deps.io.netty.handler.codec.http.HttpVersion;
import com.couchbase.client.deps.io.netty.handler.codec.http.LastHttpContent;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import rx.Observable;
import rx.subjects.BehaviorSubject;

public class ConfigHandler
extends AbstractGenericHandler<HttpObject, HttpRequest, ConfigRequest> {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(ConfigHandler.class);
    private HttpResponse responseHeader;
    private ByteBuf responseContent;
    private BehaviorSubject<String> streamingConfigObservable;

    public ConfigHandler(AbstractEndpoint endpoint, EventSink<ResponseEvent> responseBuffer, boolean isTransient) {
        super(endpoint, responseBuffer, isTransient);
    }

    ConfigHandler(AbstractEndpoint endpoint, EventSink<ResponseEvent> responseBuffer, Queue<ConfigRequest> queue, boolean isTransient) {
        super(endpoint, responseBuffer, queue, isTransient);
    }

    @Override
    protected HttpRequest encodeRequest(ChannelHandlerContext ctx, ConfigRequest msg) throws Exception {
        HttpMethod httpMethod = HttpMethod.GET;
        if (msg instanceof FlushRequest || msg instanceof InsertBucketRequest || msg instanceof UpdateBucketRequest) {
            httpMethod = HttpMethod.POST;
        } else if (msg instanceof RemoveBucketRequest) {
            httpMethod = HttpMethod.DELETE;
        }
        ByteBuf content = msg instanceof InsertBucketRequest ? Unpooled.copiedBuffer(((InsertBucketRequest)msg).payload(), CharsetUtil.UTF_8) : (msg instanceof UpdateBucketRequest ? Unpooled.copiedBuffer(((UpdateBucketRequest)msg).payload(), CharsetUtil.UTF_8) : Unpooled.EMPTY_BUFFER);
        DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, httpMethod, msg.path(), content);
        request.headers().set("User-Agent", (Object)this.env().userAgent());
        if (msg instanceof InsertBucketRequest || msg instanceof UpdateBucketRequest) {
            request.headers().set("Accept", (Object)"*/*");
            request.headers().set("Content-Type", (Object)"application/x-www-form-urlencoded");
        }
        request.headers().set("Content-Length", (Object)content.readableBytes());
        ConfigHandler.addHttpBasicAuth(ctx, request, msg.bucket(), msg.password());
        return request;
    }

    @Override
    protected CouchbaseResponse decodeResponse(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        ConfigRequest request = (ConfigRequest)this.currentRequest();
        CouchbaseResponse response = null;
        if (msg instanceof HttpResponse) {
            this.responseHeader = (HttpResponse)msg;
            if (request instanceof BucketStreamingRequest) {
                response = this.handleBucketStreamingResponse(ctx, this.responseHeader);
            }
            if (this.responseContent != null) {
                this.responseContent.clear();
            } else {
                this.responseContent = ctx.alloc().buffer();
            }
        }
        if (msg instanceof HttpContent) {
            this.responseContent.writeBytes(((HttpContent)msg).content());
            if (this.streamingConfigObservable != null) {
                this.maybePushConfigChunk();
            }
        }
        if (msg instanceof LastHttpContent) {
            String body;
            if (request instanceof BucketStreamingRequest) {
                if (this.streamingConfigObservable != null) {
                    this.streamingConfigObservable.onCompleted();
                    this.streamingConfigObservable = null;
                }
                this.finishedDecoding();
                return null;
            }
            ResponseStatus status = ResponseStatusConverter.fromHttp(this.responseHeader.getStatus().code());
            String string = body = this.responseContent.readableBytes() > 0 ? this.responseContent.toString(CHARSET) : this.responseHeader.getStatus().reasonPhrase();
            if (request instanceof BucketConfigRequest) {
                response = new BucketConfigResponse(body, status);
            } else if (request instanceof ClusterConfigRequest) {
                response = new ClusterConfigResponse(body, status);
            } else if (request instanceof BucketsConfigRequest) {
                response = new BucketsConfigResponse(body, status);
            } else if (request instanceof GetDesignDocumentsRequest) {
                response = new GetDesignDocumentsResponse(body, status, request);
            } else if (request instanceof RemoveBucketRequest) {
                response = new RemoveBucketResponse(status);
            } else if (request instanceof InsertBucketRequest) {
                response = new InsertBucketResponse(body, status);
            } else if (request instanceof UpdateBucketRequest) {
                response = new UpdateBucketResponse(body, status);
            } else if (request instanceof FlushRequest) {
                boolean done = this.responseHeader.getStatus().code() != 201;
                response = new FlushResponse(done, body, status);
            }
            this.finishedDecoding();
        }
        return response;
    }

    private CouchbaseResponse handleBucketStreamingResponse(ChannelHandlerContext ctx, HttpResponse header) {
        SocketAddress addr = ctx.channel().remoteAddress();
        String host = addr instanceof InetSocketAddress ? ((InetSocketAddress)addr).getHostName() : addr.toString();
        ResponseStatus status = ResponseStatusConverter.fromHttp(header.getStatus().code());
        Observable scheduledObservable = null;
        if (status.isSuccess()) {
            this.streamingConfigObservable = BehaviorSubject.create();
            scheduledObservable = this.streamingConfigObservable.onBackpressureBuffer().observeOn(this.env().scheduler());
        }
        return new BucketStreamingResponse((Observable<String>)scheduledObservable, host, status, (CouchbaseRequest)this.currentRequest());
    }

    private void maybePushConfigChunk() {
        String currentChunk = this.responseContent.toString(CHARSET);
        int separatorIndex = currentChunk.indexOf("\n\n\n\n");
        if (separatorIndex > 0) {
            String content = currentChunk.substring(0, separatorIndex);
            this.streamingConfigObservable.onNext((Object)content.trim());
            this.responseContent.clear();
            this.responseContent.writeBytes(currentChunk.substring(separatorIndex + 4).getBytes(CHARSET));
        }
    }

    private void releaseResponseContent() {
        if (this.responseContent != null) {
            if (this.responseContent.refCnt() > 0) {
                this.responseContent.release();
            }
            this.responseContent = null;
        }
    }

    @Override
    protected void finishedDecoding() {
        super.finishedDecoding();
        this.releaseResponseContent();
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        if (this.streamingConfigObservable != null) {
            try {
                this.streamingConfigObservable.onCompleted();
            }
            catch (RejectedExecutionException ex) {
                LOGGER.info(ConfigHandler.logIdent(ctx, this.endpoint()) + "Could not complete config stream, scheduler shut " + "down already.");
            }
        }
        super.handlerRemoved(ctx);
        this.releaseResponseContent();
    }

    @Override
    protected ServiceType serviceType() {
        return ServiceType.CONFIG;
    }
}

