/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.documentdb.internal.query;

import com.microsoft.azure.documentdb.Document;
import com.microsoft.azure.documentdb.PartitionKeyRange;
import com.microsoft.azure.documentdb.internal.DocumentServiceRequest;
import com.microsoft.azure.documentdb.internal.DocumentServiceResponse;
import com.microsoft.azure.documentdb.internal.query.FetchScheduler;
import com.microsoft.azure.documentdb.internal.query.funcs.Callback3;
import com.microsoft.azure.documentdb.internal.query.funcs.Func1;
import com.microsoft.azure.documentdb.internal.query.funcs.Func2;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DocumentProducer<T extends Document> {
    private static final Logger LOGGER = LoggerFactory.getLogger(DocumentProducer.class);
    private static final double ITEM_BUFFER_THRESHOLD = 0.1;
    private final AtomicInteger fetchInvocationCount = new AtomicInteger(0);
    private final AtomicInteger moveNextInvocationCount = new AtomicInteger(0);
    private final Func1<DocumentServiceRequest, DocumentServiceResponse> executeRequestFunc;
    private final BlockingQueue<FetchResult> fetchResultBuffer;
    private final Func2<String, Integer, DocumentServiceRequest> createRequestFunc;
    private final PartitionKeyRange targetRange;
    private final Class<T> deserializationClass;
    private final Semaphore fetchStateSemaphore = new Semaphore(1);
    protected final AtomicBoolean isFetching = new AtomicBoolean(false);
    private final AtomicInteger bufferedDocumentsCount = new AtomicInteger(0);
    private final FetchScheduler fetchScheduler;
    private final Callback3<DocumentProducer<T>, Integer, Double> produceCompleteCallback;
    private Iterator<T> currentIterator;
    private Document currentDocument;
    private boolean hasStarted;
    private int previousResponseItemCount;
    private Map<String, String> previousResponseHeaders;
    public volatile boolean isDone;
    private String responseContinuation;
    private String previousResponseContinuation;
    private int pageSize;
    private int itemsTillNextContinuationBoundary;
    private volatile String currentBackendContinuationToken;
    private boolean isAtContinuationBoundary;

    public DocumentProducer(Func1<DocumentServiceRequest, DocumentServiceResponse> executeRequestFunc, Func2<String, Integer, DocumentServiceRequest> createRequestFunc, PartitionKeyRange targetRange, Class<T> deserializationClass, FetchScheduler fetchScheduler, int initialPageSize, String initialContinuationToken, Callback3<DocumentProducer<T>, Integer, Double> produceCompleteCallback) {
        this.fetchResultBuffer = new LinkedBlockingQueue<FetchResult>();
        this.executeRequestFunc = executeRequestFunc;
        this.createRequestFunc = createRequestFunc;
        this.targetRange = targetRange;
        this.deserializationClass = deserializationClass;
        this.currentDocument = null;
        this.hasStarted = false;
        this.pageSize = initialPageSize;
        this.currentBackendContinuationToken = initialContinuationToken;
        this.fetchScheduler = fetchScheduler;
        this.produceCompleteCallback = produceCompleteCallback;
    }

    boolean shouldFetchInternal() {
        return (double)(this.itemsTillNextContinuationBoundary - 1) < (double)this.normalizedPageSize() * 0.1 && this.fetchResultBuffer.size() <= 0;
    }

    public int normalizedPageSize() {
        return this.pageSize == -1 ? 1000 : this.pageSize;
    }

    public boolean isAtContinuationBoundary() {
        return this.isAtContinuationBoundary;
    }

    public int getItemsTillNextContinuationBoundary() {
        return this.itemsTillNextContinuationBoundary;
    }

    private boolean shouldFetch() throws InterruptedException {
        if (this.fetchedAll()) {
            return false;
        }
        if (this.shouldFetchInternal()) {
            this.fetchStateSemaphore.acquire();
            try {
                boolean bl = this.shouldFetchInternal() && !this.isFetching.get();
                return bl;
            }
            finally {
                this.fetchStateSemaphore.release();
            }
        }
        return false;
    }

    private void updateRequestContinuationToken(String continuationToken) {
        this.currentBackendContinuationToken = continuationToken;
        this.hasStarted = true;
    }

    private void completeFetch(List<T> docs, Map<String, String> headerResponse) throws InterruptedException {
        this.fetchStateSemaphore.acquire();
        try {
            if (docs.size() > 0) {
                this.fetchResultBuffer.add(new FetchResult<T>(docs, headerResponse));
                this.bufferedDocumentsCount.addAndGet(docs.size());
            }
            if (this.fetchedAll()) {
                this.fetchResultBuffer.add(FetchResult.DoneResult);
            }
            this.isFetching.set(false);
        }
        finally {
            this.fetchStateSemaphore.release();
        }
    }

    private double parseRequestCharge(DocumentServiceResponse response) {
        String requestChargeHeader = response.getResponseHeaders().get("x-ms-request-charge");
        if (!StringUtils.isEmpty((CharSequence)requestChargeHeader)) {
            return Double.valueOf(requestChargeHeader.trim());
        }
        return 0.0;
    }

    private void scheduleFetch() {
        LOGGER.trace("fetchAsync invoked");
        final DocumentProducer that = this;
        Callable<Void> callable = new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                LOGGER.trace("fetchAsync callable is getting executed");
                DocumentProducer.this.fetchInvocationCount.incrementAndGet();
                FetchResult exceptionFetchResult = null;
                try {
                    List items = null;
                    DocumentServiceResponse response = null;
                    double requestCharge = 0.0;
                    do {
                        LOGGER.trace("Sending a request with continuation token {}", (Object)that.currentBackendContinuationToken);
                        DocumentServiceRequest request = (DocumentServiceRequest)that.createRequestFunc.apply(that.currentBackendContinuationToken, that.pageSize);
                        response = (DocumentServiceResponse)DocumentProducer.this.executeRequestFunc.apply(request);
                        requestCharge += DocumentProducer.this.parseRequestCharge(response);
                        that.updateRequestContinuationToken(response.getResponseHeaders().get("x-ms-continuation"));
                        items = response.getQueryResponse(that.deserializationClass);
                        that.previousResponseItemCount = items.size();
                        LOGGER.trace("Producer with range Id {} fetched {} items", (Object)that.targetRange.getId(), (Object)items.size());
                    } while (!that.fetchedAll() && items.size() <= 0);
                    that.completeFetch(items, response.getResponseHeaders());
                    that.produceCompleteCallback.run(that, items.size(), requestCharge);
                }
                catch (Exception ex) {
                    LOGGER.debug("DocumentProducer Id: {}, Exception in FetchAsync: {}", (Object)that.targetRange.getId(), (Object)ex.getMessage());
                    exceptionFetchResult = new FetchResult(ex);
                }
                if (exceptionFetchResult != null) {
                    that.updateRequestContinuationToken(that.currentBackendContinuationToken);
                    that.fetchResultBuffer.add(exceptionFetchResult);
                }
                return null;
            }
        };
        this.fetchScheduler.schedule(callable);
    }

    public boolean tryScheduleFetch() {
        if (this.fetchedAll()) {
            return false;
        }
        if (!this.isFetching.compareAndSet(false, true)) {
            return false;
        }
        this.scheduleFetch();
        return true;
    }

    public boolean moveNext() throws Exception {
        this.moveNextInvocationCount.incrementAndGet();
        if (this.isDone) {
            return false;
        }
        if (this.shouldFetch()) {
            this.tryScheduleFetch();
        }
        if (this.moveNextInternal()) {
            this.isAtContinuationBoundary = false;
            --this.itemsTillNextContinuationBoundary;
            return true;
        }
        FetchResult fetchResult = this.fetchResultBuffer.take();
        switch (fetchResult.type) {
            case Done: {
                this.isDone = true;
                this.itemsTillNextContinuationBoundary = 0;
                return false;
            }
            case Exception: {
                throw fetchResult.exception;
            }
            case Result: {
                this.updateStates(fetchResult.results, fetchResult.headerResponse);
                return true;
            }
        }
        throw new IllegalStateException(fetchResult.type.name());
    }

    private boolean moveNextInternal() {
        if (this.currentIterator == null || !this.currentIterator.hasNext()) {
            return false;
        }
        this.currentDocument = (Document)this.currentIterator.next();
        this.bufferedDocumentsCount.decrementAndGet();
        return true;
    }

    public boolean hasStarted() {
        return this.hasStarted;
    }

    public String getId() {
        return this.targetRange.getId();
    }

    public boolean equals(Object obj) {
        if (!(obj instanceof DocumentProducer)) {
            return false;
        }
        return this.getId().compareTo(((DocumentProducer)obj).getId()) == 0;
    }

    public int hashCode() {
        return this.getId().hashCode();
    }

    public boolean fetchedAll() {
        return this.hasStarted && StringUtils.isEmpty((CharSequence)this.currentBackendContinuationToken);
    }

    public Document peek() {
        if (this.isDone) {
            throw new IllegalStateException("Producer is closed");
        }
        return this.currentDocument;
    }

    public int getBufferedDocumentsCount() {
        return this.bufferedDocumentsCount.get();
    }

    public int getPreviousResponseItemCount() {
        return this.previousResponseItemCount;
    }

    public Map<String, String> getPreviousResponseHeaders() {
        return this.previousResponseHeaders;
    }

    public PartitionKeyRange getTargetRange() {
        return this.targetRange;
    }

    public String getCurrentBackendContinuationToken() {
        return this.currentBackendContinuationToken;
    }

    public int getPageSize() {
        return this.pageSize;
    }

    void notifyStop() {
        LOGGER.trace("notifyStop");
        this.fetchResultBuffer.add(FetchResult.DoneResult);
        this.currentBackendContinuationToken = null;
    }

    private void updateStates(List<T> docs, Map<String, String> headerResponse) {
        this.previousResponseContinuation = this.responseContinuation;
        this.responseContinuation = headerResponse.get("x-ms-continuation");
        this.itemsTillNextContinuationBoundary = docs.size();
        this.isAtContinuationBoundary = true;
        this.currentIterator = docs.iterator();
        LOGGER.trace("id {} Fetched Count: {}", (Object)this.getTargetRange().getId(), (Object)docs.size());
        this.moveNextInternal();
    }

    private static enum FetchResultType {
        Done,
        Exception,
        Result;

    }

    static class FetchResult<T> {
        public FetchResultType type;
        public List<T> results;
        public Exception exception;
        public Map<String, String> headerResponse;
        public static final FetchResult DoneResult = new FetchResult();

        public FetchResult(List<T> items, Map<String, String> responseHeader) {
            this.results = items;
            this.headerResponse = responseHeader;
            this.type = FetchResultType.Result;
        }

        public FetchResult(Exception exception) {
            this.exception = exception;
            this.type = FetchResultType.Exception;
        }

        private FetchResult() {
        }

        static {
            FetchResult.DoneResult.type = FetchResultType.Done;
        }
    }
}

