/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.java.analytics;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.annotations.InterfaceAudience;
import com.couchbase.client.core.annotations.InterfaceStability;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.analytics.AnalyticsQueryResultRequest;
import com.couchbase.client.core.message.analytics.AnalyticsQueryStatusRequest;
import com.couchbase.client.core.message.analytics.GenericAnalyticsResponse;
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.java.analytics.AsyncAnalyticsDeferredResultHandle;
import com.couchbase.client.java.analytics.AsyncAnalyticsQueryRow;
import com.couchbase.client.java.analytics.DefaultAsyncAnalyticsQueryRow;
import com.couchbase.client.java.bucket.api.Utils;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.error.CannotRetryException;
import com.couchbase.client.java.error.QueryExecutionException;
import com.couchbase.client.java.error.TemporaryFailureException;
import com.couchbase.client.java.error.TranscodingException;
import com.couchbase.client.java.transcoder.TranscoderUtils;
import com.couchbase.client.java.util.OnSubscribeDeferAndWatch;
import com.couchbase.client.java.util.retry.RetryBuilder;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action4;
import rx.functions.Func1;

@InterfaceStability.Experimental
@InterfaceAudience.Public
public class DefaultAsyncAnalyticsDeferredResultHandle
implements AsyncAnalyticsDeferredResultHandle {
    private static CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(DefaultAsyncAnalyticsDeferredResultHandle.class);
    private final CouchbaseEnvironment env;
    private final ClusterFacade core;
    private final String bucket;
    private final String username;
    private final String password;
    private final String statusHandle;
    private String resultHandle;
    private final long timeout;
    private final TimeUnit timeunit;

    public DefaultAsyncAnalyticsDeferredResultHandle(String handle, CouchbaseEnvironment env, ClusterFacade core, String bucket, String username, String password, long timeout, TimeUnit timeunit) {
        this.statusHandle = handle;
        this.resultHandle = "";
        this.env = env;
        this.core = core;
        this.bucket = bucket;
        this.username = username;
        this.password = password;
        this.timeout = timeout;
        this.timeunit = timeunit;
    }

    @Override
    public String getStatusHandleUri() {
        return this.statusHandle;
    }

    @Override
    public String getResultHandleUri() {
        if (this.resultHandle.length() == 0) {
            throw new IllegalStateException("There is no result handle available, retry status until success");
        }
        return this.resultHandle;
    }

    @Override
    public Observable<AsyncAnalyticsQueryRow> rows() {
        return this.rows(this.timeout, this.timeunit);
    }

    @Override
    public Observable<AsyncAnalyticsQueryRow> rows(final long timeout, final TimeUnit timeunit) {
        if (this.resultHandle.length() == 0) {
            throw new QueryExecutionException("There is no result handle available to fetch rows, retry status call until success", null);
        }
        return OnSubscribeDeferAndWatch.deferAndWatch(new Func1<Subscriber, Observable<GenericAnalyticsResponse>>(){

            public Observable<GenericAnalyticsResponse> call(Subscriber subscriber) {
                AnalyticsQueryResultRequest request = new AnalyticsQueryResultRequest(DefaultAsyncAnalyticsDeferredResultHandle.this.resultHandle, DefaultAsyncAnalyticsDeferredResultHandle.this.bucket, DefaultAsyncAnalyticsDeferredResultHandle.this.username, DefaultAsyncAnalyticsDeferredResultHandle.this.password);
                request.subscriber(subscriber);
                return Utils.applyTimeout(DefaultAsyncAnalyticsDeferredResultHandle.this.core.send(request), request, DefaultAsyncAnalyticsDeferredResultHandle.this.env, timeout, timeunit);
            }
        }).flatMap((Func1)new Func1<GenericAnalyticsResponse, Observable<AsyncAnalyticsQueryRow>>(){

            public Observable<AsyncAnalyticsQueryRow> call(GenericAnalyticsResponse response) {
                return response.rows().map((Func1)new Func1<ByteBuf, AsyncAnalyticsQueryRow>(){

                    public AsyncAnalyticsQueryRow call(ByteBuf byteBuf) {
                        try {
                            TranscoderUtils.ByteBufToArray rawData = TranscoderUtils.byteBufToByteArray(byteBuf);
                            byte[] copy = Arrays.copyOfRange(rawData.byteArray, rawData.offset, rawData.offset + rawData.length);
                            DefaultAsyncAnalyticsQueryRow defaultAsyncAnalyticsQueryRow = new DefaultAsyncAnalyticsQueryRow(copy);
                            return defaultAsyncAnalyticsQueryRow;
                        }
                        catch (Exception e) {
                            throw new TranscodingException("Could not decode Analytics Query Row.", e);
                        }
                        finally {
                            byteBuf.release();
                        }
                    }
                });
            }
        }).retryWhen((Func1)RetryBuilder.anyOf(TemporaryFailureException.class).delay(Delay.exponential(TimeUnit.MILLISECONDS, 500L, 2L)).max(10).doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>(){

            public void call(Integer attempt, Throwable error, Long delay, TimeUnit delayUnit) {
                LOGGER.debug("Retrying status because of temp failure (attempt {}, delay {} {})", new Object[]{error.getMessage(), attempt, delay, delayUnit});
            }
        }).build()).onErrorResumeNext((Func1)new Func1<Throwable, Observable<? extends AsyncAnalyticsQueryRow>>(){

            public Observable<? extends AsyncAnalyticsQueryRow> call(Throwable throwable) {
                if (throwable instanceof CannotRetryException) {
                    Observable.empty();
                }
                return Observable.error((Throwable)throwable);
            }
        });
    }

    @Override
    public Observable<String> status() {
        return this.status(this.timeout, this.timeunit);
    }

    @Override
    public Observable<String> status(final long timeout, final TimeUnit timeunit) {
        return OnSubscribeDeferAndWatch.deferAndWatch(new Func1<Subscriber, Observable<GenericAnalyticsResponse>>(){

            public Observable<GenericAnalyticsResponse> call(Subscriber subscriber) {
                AnalyticsQueryStatusRequest request = new AnalyticsQueryStatusRequest(DefaultAsyncAnalyticsDeferredResultHandle.this.statusHandle, DefaultAsyncAnalyticsDeferredResultHandle.this.bucket, DefaultAsyncAnalyticsDeferredResultHandle.this.username, DefaultAsyncAnalyticsDeferredResultHandle.this.password);
                request.subscriber(subscriber);
                return Utils.applyTimeout(DefaultAsyncAnalyticsDeferredResultHandle.this.core.send(request), request, DefaultAsyncAnalyticsDeferredResultHandle.this.env, timeout, timeunit);
            }
        }).flatMap((Func1)new Func1<GenericAnalyticsResponse, Observable<String>>(){

            public Observable<String> call(GenericAnalyticsResponse response) {
                DefaultAsyncAnalyticsDeferredResultHandle.this.resultHandle = response.handle();
                return response.queryStatus();
            }
        }).retryWhen((Func1)RetryBuilder.anyOf(TemporaryFailureException.class).delay(Delay.exponential(TimeUnit.MILLISECONDS, 500L, 2L)).max(10).doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>(){

            public void call(Integer attempt, Throwable error, Long delay, TimeUnit delayUnit) {
                LOGGER.debug("Retrying status because of temp failure (attempt {}, delay {} {})", new Object[]{error.getMessage(), attempt, delay, delayUnit});
            }
        }).build()).onErrorResumeNext((Func1)new Func1<Throwable, Observable<String>>(){

            public Observable<String> call(Throwable throwable) {
                if (throwable instanceof CannotRetryException) {
                    Observable.empty();
                }
                return Observable.error((Throwable)throwable);
            }
        });
    }

    public String toString() {
        return "DefaultAsyncAnalyticsDeferredResultHandle{statusUri='" + this.getStatusHandleUri() + '\'' + ", resultUri='" + this.getResultHandleUri() + '\'' + '}';
    }
}

