/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.java.bucket;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.annotations.InterfaceAudience;
import com.couchbase.client.core.annotations.InterfaceStability;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.kv.AbstractKeyValueRequest;
import com.couchbase.client.core.message.kv.BinaryRequest;
import com.couchbase.client.core.message.kv.GetRequest;
import com.couchbase.client.core.message.kv.GetResponse;
import com.couchbase.client.core.message.kv.ReplicaGetRequest;
import com.couchbase.client.java.ReplicaMode;
import com.couchbase.client.java.bucket.api.Get;
import com.couchbase.client.java.bucket.api.Utils;
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.transcoder.Transcoder;
import com.couchbase.client.java.util.OnSubscribeDeferAndWatch;
import io.opentracing.Scope;
import io.opentracing.Span;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func0;
import rx.functions.Func1;

@InterfaceStability.Uncommitted
@InterfaceAudience.Private
public class ReplicaReader {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(ReplicaReader.class);

    private ReplicaReader() {
    }

    public static <D extends Document<?>> Observable<D> read(final ClusterFacade core, final String id, final ReplicaMode type, final String bucket, final Map<Class<? extends Document>, Transcoder<? extends Document, ?>> transcoders, final Class<D> target, final CouchbaseEnvironment environment, final long timeout, final TimeUnit timeUnit) {
        return Observable.defer((Func0)new Func0<Observable<D>>(){

            public Observable<D> call() {
                Span parentSpan;
                if (environment.operationTracingEnabled()) {
                    Scope scope = environment.tracer().buildSpan("get_from_replica").startActive(false);
                    parentSpan = scope.span();
                    scope.close();
                } else {
                    parentSpan = null;
                }
                Observable result = ReplicaReader.assembleRequests(core, id, type, bucket).flatMap(new Func1<BinaryRequest, Observable<D>>(){

                    public Observable<D> call(final BinaryRequest request) {
                        String name = request instanceof ReplicaGetRequest ? "get_replica" : "get";
                        Utils.addRequestSpanWithParent(environment, parentSpan, request, name);
                        Observable result = OnSubscribeDeferAndWatch.deferAndWatch(new Func1<Subscriber, Observable<GetResponse>>(){

                            public Observable<GetResponse> call(Subscriber subscriber) {
                                request.subscriber(subscriber);
                                return core.send(request);
                            }
                        }).filter((Func1)new Get.GetFilter(environment));
                        if (timeout > 0L) {
                            result = result.timeout(timeout, timeUnit, environment.scheduler());
                        }
                        return result.onErrorResumeNext((Func1)GetResponseErrorHandler.INSTANCE).map(new Get.GetMap(environment, transcoders, target, id));
                    }
                });
                if (timeout > 0L) {
                    result = result.timeout(timeout, timeUnit, environment.scheduler());
                }
                return result.doOnTerminate(new Action0(){

                    public void call() {
                        if (environment.operationTracingEnabled() && parentSpan != null) {
                            environment.tracer().scopeManager().activate(parentSpan, true).close();
                        }
                    }
                }).cacheWithInitialCapacity(type.maxAffectedNodes());
            }
        });
    }

    private static Observable<BinaryRequest> assembleRequests(final ClusterFacade core, final String id, ReplicaMode type, final String bucket) {
        if (type != ReplicaMode.ALL) {
            return Observable.just((Object)new ReplicaGetRequest(id, bucket, (short)type.ordinal()));
        }
        return Observable.defer((Func0)new Func0<Observable<GetClusterConfigResponse>>(){

            public Observable<GetClusterConfigResponse> call() {
                return core.send(new GetClusterConfigRequest());
            }
        }).map((Func1)new Func1<GetClusterConfigResponse, Integer>(){

            public Integer call(GetClusterConfigResponse response) {
                CouchbaseBucketConfig conf = (CouchbaseBucketConfig)response.config().bucketConfig(bucket);
                return conf.numberOfReplicas();
            }
        }).flatMap((Func1)new Func1<Integer, Observable<BinaryRequest>>(){

            public Observable<BinaryRequest> call(Integer max) {
                ArrayList<AbstractKeyValueRequest> requests = new ArrayList<AbstractKeyValueRequest>();
                requests.add(new GetRequest(id, bucket));
                for (int i = 0; i < max; ++i) {
                    requests.add(new ReplicaGetRequest(id, bucket, (short)(i + 1)));
                }
                return Observable.from(requests);
            }
        });
    }

    private static class GetResponseErrorHandler
    implements Func1<Throwable, Observable<? extends GetResponse>> {
        public static final GetResponseErrorHandler INSTANCE = new GetResponseErrorHandler();

        private GetResponseErrorHandler() {
        }

        public Observable<? extends GetResponse> call(Throwable throwable) {
            LOGGER.info("Individual ReplicaGet failed, but ignoring. Reason: {}", (Object)throwable.toString());
            return Observable.empty();
        }
    }
}

