/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.vespa.http.server;

import com.yahoo.container.jdisc.HttpRequest;
import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.ReferencedResource;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.ReplyHandler;
import com.yahoo.messagebus.Result;
import com.yahoo.messagebus.shared.SharedSourceSession;
import com.yahoo.net.HostName;
import com.yahoo.vespa.http.client.core.ErrorCode;
import com.yahoo.vespa.http.client.core.OperationStatus;
import com.yahoo.vespa.http.server.DocumentOperationMessageV3;
import com.yahoo.vespa.http.server.ErrorHttpResponse;
import com.yahoo.vespa.http.server.FeedReaderFactory;
import com.yahoo.vespa.http.server.FeedResponse;
import com.yahoo.vespa.http.server.FeederSettings;
import com.yahoo.vespa.http.server.ReplyContext;
import com.yahoo.vespa.http.server.StreamReaderV3;
import com.yahoo.vespaxmlparser.VespaXMLFeedReader;
import com.yahoo.yolean.Exceptions;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

class ClientFeederV3 {
    protected static final Logger log = Logger.getLogger(ClientFeederV3.class.getName());
    private static final AtomicInteger outstandingOperations = new AtomicInteger(0);
    private final BlockingQueue<OperationStatus> feedReplies = new LinkedBlockingQueue<OperationStatus>();
    private final ReferencedResource<SharedSourceSession> sourceSession;
    private final String clientId;
    private final ReplyHandler feedReplyHandler;
    private final Metric metric;
    private Instant prevOpsPerSecTime = Instant.now();
    private double operationsForOpsPerSec = 0.0;
    private final Object monitor = new Object();
    private final StreamReaderV3 streamReaderV3;
    private final AtomicInteger ongoingRequests = new AtomicInteger(0);
    private String hostName;
    private AtomicInteger threadsAvailableForFeeding;

    ClientFeederV3(ReferencedResource<SharedSourceSession> sourceSession, FeedReaderFactory feedReaderFactory, DocumentTypeManager docTypeManager, String clientId, Metric metric, ReplyHandler feedReplyHandler, AtomicInteger threadsAvailableForFeeding) {
        this.sourceSession = sourceSession;
        this.clientId = clientId;
        this.feedReplyHandler = feedReplyHandler;
        this.metric = metric;
        this.threadsAvailableForFeeding = threadsAvailableForFeeding;
        this.streamReaderV3 = new StreamReaderV3(feedReaderFactory, docTypeManager);
        this.hostName = HostName.getLocalhost();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean timedOut() {
        Object object = this.monitor;
        synchronized (object) {
            return Instant.now().isAfter(this.prevOpsPerSecTime.plusSeconds(6000L)) && this.ongoingRequests.get() == 0;
        }
    }

    public void kill() {
        while (this.ongoingRequests.get() > 0) {
            try {
                this.ongoingRequests.wait(100L);
            }
            catch (InterruptedException e) {
                // empty catch block
                break;
            }
        }
        this.sourceSession.getReference().close();
    }

    private void transferPreviousRepliesToResponse(BlockingQueue<OperationStatus> operations) throws InterruptedException {
        OperationStatus status = (OperationStatus)this.feedReplies.poll();
        while (status != null) {
            outstandingOperations.decrementAndGet();
            operations.put(status);
            status = (OperationStatus)this.feedReplies.poll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public HttpResponse handleRequest(HttpRequest request) throws IOException {
        this.threadsAvailableForFeeding.decrementAndGet();
        this.ongoingRequests.incrementAndGet();
        try {
            Object object;
            FeederSettings feederSettings = new FeederSettings(request);
            if (feederSettings.denyIfBusy && this.threadsAvailableForFeeding.get() < -10) {
                ErrorHttpResponse errorHttpResponse = new ErrorHttpResponse(this.getOverloadReturnCode(request), "Gateway overloaded");
                return errorHttpResponse;
            }
            InputStream inputStream = StreamReaderV3.unzipStreamIfNeeded(request);
            LinkedBlockingQueue<OperationStatus> replies = new LinkedBlockingQueue<OperationStatus>();
            try {
                this.feed(feederSettings, inputStream, replies, this.threadsAvailableForFeeding);
                object = this.monitor;
                synchronized (object) {
                    if (request.getJDiscRequest().headers().get((Object)"X-Yahoo-Feed-Data-Format") != null) {
                        this.transferPreviousRepliesToResponse(replies);
                    }
                }
            }
            catch (InterruptedException interruptedException) {
            }
            catch (Throwable e) {
                log.log(LogLevel.WARNING, "Unhandled exception while feeding: " + Exceptions.toMessageString((Throwable)e), e);
            }
            finally {
                replies.add(this.createOperationStatus("-", "-", ErrorCode.END_OF_FEED, false, null));
            }
            object = new FeedResponse(200, replies, 3, this.clientId, outstandingOperations.get(), this.hostName);
            return object;
        }
        finally {
            this.ongoingRequests.decrementAndGet();
            this.threadsAvailableForFeeding.incrementAndGet();
        }
    }

    private int getOverloadReturnCode(HttpRequest request) {
        if (request.getHeader("X-Yahoo-Silent-Upgrade") != null) {
            return 299;
        }
        return 429;
    }

    private Optional<DocumentOperationMessageV3> pullMessageFromRequest(FeederSettings settings, InputStream requestInputStream, BlockingQueue<OperationStatus> repliesFromOldMessages) {
        DocumentOperationMessageV3 message;
        while (true) {
            Optional<String> operationId;
            try {
                operationId = this.streamReaderV3.getNextOperationId(requestInputStream);
            }
            catch (IOException ioe) {
                if (log.isLoggable((Level)LogLevel.DEBUG)) {
                    log.log((Level)LogLevel.DEBUG, Exceptions.toMessageString((Throwable)ioe), ioe);
                }
                return Optional.empty();
            }
            if (!operationId.isPresent()) {
                return Optional.empty();
            }
            try {
                message = this.getNextMessage(operationId.get(), requestInputStream, settings);
            }
            catch (Exception e) {
                if (log.isLoggable((Level)LogLevel.DEBUG)) {
                    log.log((Level)LogLevel.DEBUG, Exceptions.toMessageString((Throwable)e), e);
                }
                repliesFromOldMessages.add(new OperationStatus(Exceptions.toMessageString((Throwable)e), operationId.get(), ErrorCode.ERROR, false, ""));
                continue;
            }
            break;
        }
        if (message != null) {
            this.setRoute(message, settings);
        }
        return Optional.ofNullable(message);
    }

    private Result sendMessage(FeederSettings settings, DocumentOperationMessageV3 msg, AtomicInteger threadsAvailableForFeeding) throws InterruptedException {
        Result result = null;
        while (result == null || result.getError().getCode() == 100001) {
            msg.getMessage().pushHandler(this.feedReplyHandler);
            if (settings.denyIfBusy && threadsAvailableForFeeding.get() < 1) {
                return ((SharedSourceSession)this.sourceSession.getResource()).sendMessage(msg.getMessage());
            }
            result = ((SharedSourceSession)this.sourceSession.getResource()).sendMessageBlocking(msg.getMessage());
            if (result.isAccepted()) {
                return result;
            }
            Thread.sleep(100L);
        }
        return result;
    }

    private void feed(FeederSettings settings, InputStream requestInputStream, BlockingQueue<OperationStatus> repliesFromOldMessages, AtomicInteger threadsAvailableForFeeding) throws InterruptedException {
        Optional<DocumentOperationMessageV3> msg;
        while ((msg = this.pullMessageFromRequest(settings, requestInputStream, repliesFromOldMessages)).isPresent()) {
            Result result;
            this.setMessageParameters(msg.get(), settings);
            try {
                result = this.sendMessage(settings, msg.get(), threadsAvailableForFeeding);
            }
            catch (RuntimeException e) {
                repliesFromOldMessages.add(this.createOperationStatus(msg.get().getOperationId(), Exceptions.toMessageString((Throwable)e), ErrorCode.ERROR, false, msg.get().getMessage()));
                continue;
            }
            if (result.isAccepted()) {
                outstandingOperations.incrementAndGet();
                this.updateOpsPerSec();
                this.log(LogLevel.DEBUG, "Sent message successfully, document id: ", msg.get().getOperationId());
                continue;
            }
            if (!result.getError().isFatal()) {
                repliesFromOldMessages.add(this.createOperationStatus(msg.get().getOperationId(), result.getError().getMessage(), ErrorCode.TRANSIENT_ERROR, false, msg.get().getMessage()));
                continue;
            }
            boolean isConditionNotMet = result.getError().getCode() == 251013;
            repliesFromOldMessages.add(this.createOperationStatus(msg.get().getOperationId(), result.getError().getMessage(), ErrorCode.ERROR, isConditionNotMet, msg.get().getMessage()));
        }
    }

    private OperationStatus createOperationStatus(String id, String message, ErrorCode code, boolean isConditionNotMet, Message msg) {
        String traceMessage = msg != null && msg.getTrace() != null && msg.getTrace().getLevel() > 0 ? msg.getTrace().toString() : "";
        return new OperationStatus(message, id, code, isConditionNotMet, traceMessage);
    }

    protected DocumentOperationMessageV3 getNextMessage(String operationId, InputStream requestInputStream, FeederSettings settings) throws Exception {
        DocumentOperationMessageV3 message;
        VespaXMLFeedReader.Operation operation = this.streamReaderV3.getNextOperation(requestInputStream, settings);
        if (((SharedSourceSession)this.sourceSession.getResource()).session() != null) {
            this.metric.set("httpapi_pending", (Number)((SharedSourceSession)this.sourceSession.getResource()).session().getPendingCount(), null);
        }
        if ((message = DocumentOperationMessageV3.create(operation, operationId, this.metric)) == null) {
            return null;
        }
        this.metric.add("httpapi_num_operations", (Number)1, null);
        this.log(LogLevel.DEBUG, "Successfully deserialized document id: ", message.getOperationId());
        return message;
    }

    private void setMessageParameters(DocumentOperationMessageV3 msg, FeederSettings settings) {
        msg.getMessage().setContext((Object)new ReplyContext(msg.getOperationId(), this.feedReplies));
        if (settings.traceLevel != null) {
            msg.getMessage().getTrace().setLevel(settings.traceLevel.intValue());
        }
        if (settings.priority != null) {
            try {
                DocumentProtocol.Priority priority = DocumentProtocol.Priority.valueOf((String)settings.priority);
                if (msg.getMessage() instanceof DocumentMessage) {
                    ((DocumentMessage)msg.getMessage()).setPriority(priority);
                }
            }
            catch (IllegalArgumentException i) {
                log.severe(i.getMessage());
            }
        }
    }

    private void setRoute(DocumentOperationMessageV3 msg, FeederSettings settings) {
        if (settings.route != null) {
            msg.getMessage().setRoute(settings.route);
        }
    }

    protected final void log(LogLevel level, Object ... msgParts) {
        if (!log.isLoggable((Level)level)) {
            return;
        }
        StringBuilder s = new StringBuilder();
        for (Object part : msgParts) {
            s.append(part.toString());
        }
        log.log((Level)level, s.toString());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateOpsPerSec() {
        Instant now = Instant.now();
        Object object = this.monitor;
        synchronized (object) {
            if (now.plusSeconds(1L).isAfter(this.prevOpsPerSecTime)) {
                Duration duration = Duration.between(now, this.prevOpsPerSecTime);
                double opsPerSec = this.operationsForOpsPerSec / ((double)duration.toMillis() / 1000.0);
                this.metric.set("httpapi_ops_per_sec", (Number)opsPerSec, null);
                this.operationsForOpsPerSec = 1.0;
                this.prevOpsPerSecTime = now;
            } else {
                this.operationsForOpsPerSec += 1.0;
            }
        }
    }
}

