/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.vespa.http.server;

import com.yahoo.container.jdisc.HttpRequest;
import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.ReferencedResource;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.ReplyHandler;
import com.yahoo.messagebus.Result;
import com.yahoo.messagebus.shared.SharedSourceSession;
import com.yahoo.net.HostName;
import com.yahoo.vespa.http.client.core.ErrorCode;
import com.yahoo.vespa.http.client.core.OperationStatus;
import com.yahoo.vespa.http.server.DocumentOperationMessageV3;
import com.yahoo.vespa.http.server.FeedReaderFactory;
import com.yahoo.vespa.http.server.FeedResponse;
import com.yahoo.vespa.http.server.FeederSettings;
import com.yahoo.vespa.http.server.ReplyContext;
import com.yahoo.vespa.http.server.StreamReaderV3;
import com.yahoo.vespaxmlparser.FeedOperation;
import com.yahoo.yolean.Exceptions;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

class ClientFeederV3 {
    protected static final Logger log = Logger.getLogger(ClientFeederV3.class.getName());
    private static final AtomicInteger outstandingOperations = new AtomicInteger(0);
    private final BlockingQueue<OperationStatus> feedReplies = new LinkedBlockingQueue<OperationStatus>();
    private final ReferencedResource<SharedSourceSession> sourceSession;
    private final String clientId;
    private final ReplyHandler feedReplyHandler;
    private final Metric metric;
    private Instant prevOpsPerSecTime = Instant.now();
    private double operationsForOpsPerSec = 0.0;
    private final Object monitor = new Object();
    private final StreamReaderV3 streamReaderV3;
    private final AtomicInteger ongoingRequests = new AtomicInteger(0);
    private final String hostName;

    ClientFeederV3(ReferencedResource<SharedSourceSession> sourceSession, FeedReaderFactory feedReaderFactory, DocumentTypeManager docTypeManager, String clientId, Metric metric, ReplyHandler feedReplyHandler) {
        this.sourceSession = sourceSession;
        this.clientId = clientId;
        this.feedReplyHandler = feedReplyHandler;
        this.metric = metric;
        this.streamReaderV3 = new StreamReaderV3(feedReaderFactory, docTypeManager);
        this.hostName = HostName.getLocalhost();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean timedOut() {
        Object object = this.monitor;
        synchronized (object) {
            return Instant.now().isAfter(this.prevOpsPerSecTime.plusSeconds(6000L)) && this.ongoingRequests.get() == 0;
        }
    }

    public void kill() {
        while (this.ongoingRequests.get() > 0) {
            try {
                this.ongoingRequests.wait(100L);
            }
            catch (InterruptedException e) {
                // empty catch block
                break;
            }
        }
        this.sourceSession.getReference().close();
    }

    private void transferPreviousRepliesToResponse(BlockingQueue<OperationStatus> operations) throws InterruptedException {
        OperationStatus status = (OperationStatus)this.feedReplies.poll();
        while (status != null) {
            outstandingOperations.decrementAndGet();
            operations.put(status);
            status = (OperationStatus)this.feedReplies.poll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public HttpResponse handleRequest(HttpRequest request) throws IOException {
        this.ongoingRequests.incrementAndGet();
        try {
            Object object;
            FeederSettings feederSettings = new FeederSettings(request);
            InputStream inputStream = StreamReaderV3.unzipStreamIfNeeded(request);
            LinkedBlockingQueue<OperationStatus> replies = new LinkedBlockingQueue<OperationStatus>();
            try {
                this.feed(feederSettings, inputStream, replies);
                object = this.monitor;
                synchronized (object) {
                    if (request.getJDiscRequest().headers().get((Object)"X-Yahoo-Feed-Data-Format") != null) {
                        this.transferPreviousRepliesToResponse(replies);
                    }
                }
            }
            catch (InterruptedException interruptedException) {
            }
            catch (Throwable e) {
                log.log(Level.WARNING, "Unhandled exception while feeding: " + Exceptions.toMessageString((Throwable)e), e);
            }
            finally {
                replies.add(this.createOperationStatus("-", "-", ErrorCode.END_OF_FEED, null));
            }
            object = new FeedResponse(200, replies, 3, this.clientId, outstandingOperations.get(), this.hostName);
            return object;
        }
        finally {
            this.ongoingRequests.decrementAndGet();
        }
    }

    private Optional<DocumentOperationMessageV3> pullMessageFromRequest(FeederSettings settings, InputStream requestInputStream, BlockingQueue<OperationStatus> repliesFromOldMessages) {
        while (true) {
            Optional<String> operationId;
            try {
                operationId = this.streamReaderV3.getNextOperationId(requestInputStream);
                if (operationId.isEmpty()) {
                    return Optional.empty();
                }
            }
            catch (IOException ioe) {
                log.log(Level.FINE, () -> Exceptions.toMessageString((Throwable)ioe));
                return Optional.empty();
            }
            try {
                DocumentOperationMessageV3 message = this.getNextMessage(operationId.get(), requestInputStream, settings);
                if (message != null) {
                    this.setRoute(message, settings);
                }
                return Optional.ofNullable(message);
            }
            catch (Exception e) {
                log.log(Level.WARNING, () -> Exceptions.toMessageString((Throwable)e));
                this.metric.add("httpapi_parse_error", (Number)1, null);
                repliesFromOldMessages.add(new OperationStatus(Exceptions.toMessageString((Throwable)e), operationId.get(), ErrorCode.ERROR, false, ""));
                continue;
            }
            break;
        }
    }

    private Result sendMessage(DocumentOperationMessageV3 msg) {
        msg.getMessage().pushHandler(this.feedReplyHandler);
        return ((SharedSourceSession)this.sourceSession.getResource()).sendMessage(msg.getMessage());
    }

    private void feed(FeederSettings settings, InputStream requestInputStream, BlockingQueue<OperationStatus> repliesFromOldMessages) {
        Optional<DocumentOperationMessageV3> message;
        while (!(message = this.pullMessageFromRequest(settings, requestInputStream, repliesFromOldMessages)).isEmpty()) {
            Result result;
            this.setMessageParameters(message.get(), settings);
            try {
                result = this.sendMessage(message.get());
            }
            catch (RuntimeException e) {
                repliesFromOldMessages.add(this.createOperationStatus(message.get().getOperationId(), Exceptions.toMessageString((Throwable)e), ErrorCode.ERROR, message.get().getMessage()));
                continue;
            }
            if (result.isAccepted()) {
                outstandingOperations.incrementAndGet();
                this.updateOpsPerSec();
                this.log(Level.FINE, "Sent message successfully, document id: ", message.get().getOperationId());
                continue;
            }
            if (!result.getError().isFatal()) {
                repliesFromOldMessages.add(this.createOperationStatus(message.get().getOperationId(), result.getError().getMessage(), ErrorCode.TRANSIENT_ERROR, message.get().getMessage()));
                continue;
            }
            repliesFromOldMessages.add(this.createOperationStatus(message.get().getOperationId(), result.getError().getMessage(), ErrorCode.ERROR, message.get().getMessage()));
        }
    }

    private OperationStatus createOperationStatus(String id, String message, ErrorCode code, Message msg) {
        String traceMessage = msg != null && msg.getTrace() != null && msg.getTrace().getLevel() > 0 ? msg.getTrace().toString() : "";
        return new OperationStatus(message, id, code, false, traceMessage);
    }

    protected DocumentOperationMessageV3 getNextMessage(String operationId, InputStream requestInputStream, FeederSettings settings) throws Exception {
        DocumentOperationMessageV3 message;
        FeedOperation operation = this.streamReaderV3.getNextOperation(requestInputStream, settings);
        if (((SharedSourceSession)this.sourceSession.getResource()).session() != null) {
            this.metric.set("httpapi_pending", (Number)((SharedSourceSession)this.sourceSession.getResource()).session().getPendingCount(), null);
        }
        if ((message = DocumentOperationMessageV3.create(operation, operationId, this.metric)) == null) {
            return null;
        }
        this.metric.add("httpapi_num_operations", (Number)1, null);
        this.log(Level.FINE, "Successfully deserialized document id: ", message.getOperationId());
        return message;
    }

    private void setMessageParameters(DocumentOperationMessageV3 msg, FeederSettings settings) {
        msg.getMessage().setContext((Object)new ReplyContext(msg.getOperationId(), this.feedReplies));
        if (settings.traceLevel != null) {
            msg.getMessage().getTrace().setLevel(settings.traceLevel.intValue());
        }
        if (settings.priority != null) {
            try {
                DocumentProtocol.Priority priority = DocumentProtocol.Priority.valueOf((String)settings.priority);
                if (msg.getMessage() instanceof DocumentMessage) {
                    ((DocumentMessage)msg.getMessage()).setPriority(priority);
                }
            }
            catch (IllegalArgumentException i) {
                log.severe(i.getMessage());
            }
        }
    }

    private void setRoute(DocumentOperationMessageV3 msg, FeederSettings settings) {
        if (settings.route != null) {
            msg.getMessage().setRoute(settings.route);
        }
    }

    protected final void log(Level level, Object ... msgParts) {
        if (!log.isLoggable(level)) {
            return;
        }
        StringBuilder s = new StringBuilder();
        for (Object part : msgParts) {
            s.append(part.toString());
        }
        log.log(level, s.toString());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateOpsPerSec() {
        Instant now = Instant.now();
        Object object = this.monitor;
        synchronized (object) {
            if (now.plusSeconds(1L).isAfter(this.prevOpsPerSecTime)) {
                Duration duration = Duration.between(now, this.prevOpsPerSecTime);
                double opsPerSec = this.operationsForOpsPerSec / ((double)duration.toMillis() / 1000.0);
                this.metric.set("httpapi_ops_per_sec", (Number)opsPerSec, null);
                this.operationsForOpsPerSec = 1.0;
                this.prevOpsPerSecTime = now;
            } else {
                this.operationsForOpsPerSec += 1.0;
            }
        }
    }
}

