/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.document.restapi;

import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentId;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentUpdate;
import com.yahoo.document.FixedBucketSpaces;
import com.yahoo.document.restapi.DocumentOperationExecutor;
import com.yahoo.document.restapi.DocumentOperationExecutorConfig;
import com.yahoo.document.select.parser.ParseException;
import com.yahoo.documentapi.AsyncParameters;
import com.yahoo.documentapi.AsyncSession;
import com.yahoo.documentapi.DocumentAccess;
import com.yahoo.documentapi.DocumentOperationParameters;
import com.yahoo.documentapi.DocumentResponse;
import com.yahoo.documentapi.DumpVisitorDataHandler;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.Response;
import com.yahoo.documentapi.ResponseHandler;
import com.yahoo.documentapi.Result;
import com.yahoo.documentapi.VisitorControlHandler;
import com.yahoo.documentapi.VisitorControlSession;
import com.yahoo.documentapi.VisitorDataHandler;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.ThrottlePolicy;
import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
import com.yahoo.yolean.Exceptions;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class DocumentOperationExecutorImpl
implements DocumentOperationExecutor {
    private static final Logger log = Logger.getLogger(DocumentOperationExecutorImpl.class.getName());
    private final Duration visitTimeout;
    private final long maxThrottled;
    private final DocumentAccess access;
    private final AsyncSession asyncSession;
    private final Map<String, StorageCluster> clusters;
    private final Clock clock;
    private final DelayQueue throttled;
    private final DelayQueue timeouts;
    private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<VisitorControlHandler, VisitorSession>();
    private final ExecutorService visitSessionShutdownExecutor = Executors.newSingleThreadExecutor((ThreadFactory)new DaemonThreadFactory("visit-session-shutdown-"));

    public DocumentOperationExecutorImpl(ClusterListConfig clustersConfig, AllClustersBucketSpacesConfig bucketsConfig, DocumentOperationExecutorConfig executorConfig, DocumentAccess access, Clock clock) {
        this(Duration.ofMillis(executorConfig.resendDelayMillis()), Duration.ofSeconds(executorConfig.defaultTimeoutSeconds()), Duration.ofSeconds(executorConfig.visitTimeoutSeconds()), executorConfig.maxThrottled(), access, DocumentOperationExecutorImpl.parseClusters(clustersConfig, bucketsConfig), clock);
    }

    DocumentOperationExecutorImpl(Duration resendDelay, Duration defaultTimeout, Duration visitTimeout, long maxThrottled, DocumentAccess access, Map<String, StorageCluster> clusters, Clock clock) {
        this.visitTimeout = Objects.requireNonNull(visitTimeout);
        this.maxThrottled = maxThrottled;
        this.access = Objects.requireNonNull(access);
        this.asyncSession = access.createAsyncSession(new AsyncParameters());
        this.clock = Objects.requireNonNull(clock);
        this.clusters = Map.copyOf(clusters);
        this.throttled = new DelayQueue(maxThrottled, this::send, Duration.ZERO, clock, "throttle");
        this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> {
            context.error(DocumentOperationExecutor.ErrorType.TIMEOUT, "Timed out after " + defaultTimeout);
            return true;
        }, defaultTimeout, clock, "timeout");
    }

    private static VisitorParameters asParameters(DocumentOperationExecutor.VisitorOptions options, Map<String, StorageCluster> clusters, Duration visitTimeout) {
        if (options.cluster.isEmpty() && options.documentType.isEmpty()) {
            throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level");
        }
        VisitorParameters parameters = new VisitorParameters(Stream.of(options.selection, options.documentType, options.namespace.map(value -> "id.namespace=='" + value + "'"), options.group.map(DocumentOperationExecutor.Group::selection)).flatMap(Optional::stream).reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), StringJoiner::add, StringJoiner::merge).toString());
        options.continuation.map(ProgressToken::fromSerializedString).ifPresent(arg_0 -> ((VisitorParameters)parameters).setResumeToken(arg_0));
        parameters.setFieldSet(options.fieldSet.orElse(options.documentType.map(type -> type + ":[document]").orElse("[all]")));
        options.wantedDocumentCount.ifPresent(count -> {
            if (count <= 0) {
                throw new IllegalArgumentException("wantedDocumentCount must be positive");
            }
        });
        parameters.setMaxTotalHits((long)options.wantedDocumentCount.orElse(1024).intValue());
        options.concurrency.ifPresent(value -> {
            if (value <= 0) {
                throw new IllegalArgumentException("concurrency must be positive");
            }
        });
        parameters.setThrottlePolicy((ThrottlePolicy)new StaticThrottlePolicy().setMaxPendingCount(options.concurrency.orElse(1).intValue()));
        parameters.setTimeoutMs(visitTimeout.toMillis());
        parameters.visitInconsistentBuckets(true);
        parameters.setPriority(DocumentProtocol.Priority.NORMAL_4);
        StorageCluster storageCluster = DocumentOperationExecutorImpl.resolveCluster(options.cluster, clusters);
        parameters.setRoute(storageCluster.route());
        parameters.setBucketSpace(DocumentOperationExecutorImpl.resolveBucket(storageCluster, options.documentType, List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()), options.bucketSpace));
        return parameters;
    }

    @Override
    public void shutdown() {
        long shutdownMillis = this.clock.instant().plusSeconds(20L).toEpochMilli();
        this.visitSessionShutdownExecutor.shutdown();
        this.visits.values().forEach(VisitorControlSession::destroy);
        Future<?> throttleShutdown = this.throttled.shutdown(Duration.ofSeconds(10L), context -> context.error(DocumentOperationExecutor.ErrorType.OVERLOAD, "Retry on overload failed due to shutdown"));
        Future<?> timeoutShutdown = this.timeouts.shutdown(Duration.ofSeconds(15L), context -> context.error(DocumentOperationExecutor.ErrorType.TIMEOUT, "Timed out due to shutdown"));
        try {
            throttleShutdown.get(Math.max(1L, shutdownMillis - this.clock.millis()), TimeUnit.MILLISECONDS);
            timeoutShutdown.get(Math.max(1L, shutdownMillis - this.clock.millis()), TimeUnit.MILLISECONDS);
            this.visitSessionShutdownExecutor.awaitTermination(Math.max(1L, shutdownMillis - this.clock.millis()), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            throttleShutdown.cancel(true);
            timeoutShutdown.cancel(true);
            log.log(Level.WARNING, "Exception shutting down " + this.getClass().getName(), e);
        }
    }

    @Override
    public void get(DocumentId id, DocumentOperationParameters parameters, DocumentOperationExecutor.OperationContext context) {
        this.accept(() -> this.asyncSession.get(id, parameters.withResponseHandler(this.handlerOf(parameters, context))), context);
    }

    @Override
    public void put(DocumentPut put, DocumentOperationParameters parameters, DocumentOperationExecutor.OperationContext context) {
        this.accept(() -> this.asyncSession.put(put, parameters.withResponseHandler(this.handlerOf(parameters, context))), context);
    }

    @Override
    public void update(DocumentUpdate update, DocumentOperationParameters parameters, DocumentOperationExecutor.OperationContext context) {
        this.accept(() -> this.asyncSession.update(update, parameters.withResponseHandler(this.handlerOf(parameters, context))), context);
    }

    @Override
    public void remove(DocumentId id, DocumentOperationParameters parameters, DocumentOperationExecutor.OperationContext context) {
        this.accept(() -> this.asyncSession.remove(id, parameters.withResponseHandler(this.handlerOf(parameters, context))), context);
    }

    @Override
    public void visit(DocumentOperationExecutor.VisitorOptions options, final DocumentOperationExecutor.VisitOperationsContext context) {
        try {
            final AtomicBoolean done = new AtomicBoolean(false);
            VisitorParameters parameters = DocumentOperationExecutorImpl.asParameters(options, this.clusters, this.visitTimeout);
            parameters.setLocalDataHandler((VisitorDataHandler)new DumpVisitorDataHandler(){

                public void onDocument(Document doc, long timeStamp) {
                    context.document(doc);
                }

                public void onRemove(DocumentId id) {
                }
            });
            parameters.setControlHandler(new VisitorControlHandler(){

                public void onDone(VisitorControlHandler.CompletionCode code, String message) {
                    super.onDone(code, message);
                    switch (code) {
                        case TIMEOUT: {
                            if (!this.hasVisitedAnyBuckets()) {
                                context.error(DocumentOperationExecutor.ErrorType.TIMEOUT, "No buckets visited within timeout of " + DocumentOperationExecutorImpl.this.visitTimeout);
                            }
                        }
                        case SUCCESS: 
                        case ABORTED: {
                            context.success(Optional.ofNullable(this.getProgress()).filter(progress -> !progress.isFinished()).map(ProgressToken::serializeToString));
                            break;
                        }
                        default: {
                            context.error(DocumentOperationExecutor.ErrorType.ERROR, message != null ? message : "Visiting failed");
                        }
                    }
                    done.set(true);
                    DocumentOperationExecutorImpl.this.visits.computeIfPresent(this, (__, session) -> {
                        DocumentOperationExecutorImpl.this.visitSessionShutdownExecutor.execute(() -> session.destroy());
                        return null;
                    });
                }
            });
            this.visits.put(parameters.getControlHandler(), this.access.createVisitorSession(parameters));
            if (done.get()) {
                this.visits.computeIfPresent(parameters.getControlHandler(), (__, session) -> {
                    this.visitSessionShutdownExecutor.execute(() -> session.destroy());
                    return null;
                });
            }
        }
        catch (ParseException | IllegalArgumentException e) {
            context.error(DocumentOperationExecutor.ErrorType.BAD_REQUEST, Exceptions.toMessageString((Throwable)e));
        }
        catch (RuntimeException e) {
            context.error(DocumentOperationExecutor.ErrorType.ERROR, Exceptions.toMessageString((Throwable)e));
        }
    }

    @Override
    public String routeToCluster(String cluster) {
        return DocumentOperationExecutorImpl.resolveCluster(Optional.of(cluster), this.clusters).route();
    }

    private ResponseHandler handlerOf(DocumentOperationParameters parameters, DocumentOperationExecutor.OperationContext context) {
        return response -> {
            parameters.responseHandler().ifPresent(originalHandler -> originalHandler.handleResponse(response));
            if (response.isSuccess()) {
                context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse)response).getDocument()) : Optional.empty());
            } else {
                context.error(DocumentOperationExecutorImpl.toErrorType(response.outcome()), response.getTextMessage());
            }
        };
    }

    private void accept(Supplier<Result> operation, DocumentOperationExecutor.OperationContext context) {
        this.timeouts.add(operation, context);
        if (!(this.throttled.size() <= 0L && this.send(operation, context) || this.throttled.add(operation, context))) {
            context.error(DocumentOperationExecutor.ErrorType.OVERLOAD, this.maxThrottled + " requests already in retry queue");
        }
    }

    private boolean send(Supplier<Result> operation, DocumentOperationExecutor.OperationContext context) {
        Result result = operation.get();
        switch (result.type()) {
            case SUCCESS: {
                return true;
            }
            case TRANSIENT_ERROR: {
                return false;
            }
            default: {
                log.log(Level.WARNING, "Unknown result type '" + result.type() + "'");
            }
            case FATAL_ERROR: 
        }
        context.error(DocumentOperationExecutor.ErrorType.ERROR, result.getError().getMessage());
        return true;
    }

    private static DocumentOperationExecutor.ErrorType toErrorType(Response.Outcome outcome) {
        switch (outcome) {
            case NOT_FOUND: {
                return DocumentOperationExecutor.ErrorType.NOT_FOUND;
            }
            case CONDITION_FAILED: {
                return DocumentOperationExecutor.ErrorType.PRECONDITION_FAILED;
            }
            case INSUFFICIENT_STORAGE: {
                return DocumentOperationExecutor.ErrorType.INSUFFICIENT_STORAGE;
            }
            default: {
                log.log(Level.WARNING, "Unexpected response outcome: " + outcome);
            }
            case ERROR: 
        }
        return DocumentOperationExecutor.ErrorType.ERROR;
    }

    static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) {
        if (clusters.isEmpty()) {
            throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled");
        }
        return wanted.map(cluster -> {
            if (!clusters.containsKey(cluster)) {
                throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only '" + String.join((CharSequence)"', '", clusters.keySet()) + "'");
            }
            return (StorageCluster)clusters.get(cluster);
        }).orElseGet(() -> {
            if (clusters.size() > 1) {
                throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: '" + String.join((CharSequence)"', '", clusters.keySet()) + "'");
            }
            return (StorageCluster)clusters.values().iterator().next();
        });
    }

    private static String resolveBucket(StorageCluster cluster, Optional<String> documentType, List<String> bucketSpaces, Optional<String> bucketSpace) {
        return documentType.map(type -> cluster.bucketOf((String)type).orElseThrow(() -> new IllegalArgumentException("Document type '" + type + "' in cluster '" + cluster.name() + "' is not mapped to a known bucket space"))).or(() -> bucketSpace.map(space -> {
            if (!bucketSpaces.contains(space)) {
                throw new IllegalArgumentException("Bucket space '" + space + "' is not a known bucket space; expected one of " + String.join((CharSequence)", ", bucketSpaces));
            }
            return space;
        })).orElse(FixedBucketSpaces.defaultSpace());
    }

    private static Map<String, StorageCluster> parseClusters(ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets) {
        return clusters.storage().stream().collect(Collectors.toUnmodifiableMap(storage -> storage.name(), storage -> new StorageCluster(storage.name(), storage.configid(), buckets.cluster(storage.name()).documentType().entrySet().stream().collect(Collectors.toMap(entry -> (String)entry.getKey(), entry -> ((AllClustersBucketSpacesConfig.Cluster.DocumentType)entry.getValue()).bucketSpace())))));
    }

    AsyncSession asyncSession() {
        return this.asyncSession;
    }

    Collection<VisitorControlHandler> visitorSessions() {
        return this.visits.keySet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void notifyMaintainers() throws InterruptedException {
        DelayQueue delayQueue = this.throttled;
        synchronized (delayQueue) {
            this.throttled.notify();
            this.throttled.wait();
        }
        delayQueue = this.timeouts;
        synchronized (delayQueue) {
            this.timeouts.notify();
            this.timeouts.wait();
        }
    }

    static class StorageCluster {
        private final String name;
        private final String configId;
        private final Map<String, String> documentBuckets;

        StorageCluster(String name, String configId, Map<String, String> documentBuckets) {
            this.name = Objects.requireNonNull(name);
            this.configId = Objects.requireNonNull(configId);
            this.documentBuckets = Map.copyOf(documentBuckets);
        }

        String name() {
            return this.name;
        }

        String configId() {
            return this.configId;
        }

        String route() {
            return "[Storage:cluster=" + this.name() + ";clusterconfigid=" + this.configId() + "]";
        }

        Optional<String> bucketOf(String documentType) {
            return Optional.ofNullable(this.documentBuckets.get(documentType));
        }
    }

    private static class Delayed {
        private final Supplier<Result> operation;
        private final DocumentOperationExecutor.OperationContext context;
        private final Instant readyAt;

        Delayed(Instant readyAt, Supplier<Result> operation, DocumentOperationExecutor.OperationContext context) {
            this.readyAt = Objects.requireNonNull(readyAt);
            this.context = Objects.requireNonNull(context);
            this.operation = Objects.requireNonNull(operation);
        }

        Supplier<Result> operation() {
            return this.operation;
        }

        DocumentOperationExecutor.OperationContext context() {
            return this.context;
        }

        Instant readyAt() {
            return this.readyAt;
        }
    }

    static class DelayQueue {
        private final long maxSize;
        private final Clock clock;
        private final ConcurrentLinkedQueue<Delayed> queue = new ConcurrentLinkedQueue();
        private final AtomicLong size = new AtomicLong(0L);
        private final Thread maintainer;
        private final Duration delay;
        private final long defaultWaitMillis;

        public DelayQueue(long maxSize, BiPredicate<Supplier<Result>, DocumentOperationExecutor.OperationContext> action, Duration delay, Clock clock, String threadName) {
            if (maxSize < 0L) {
                throw new IllegalArgumentException("Max size cannot be negative, but was " + maxSize);
            }
            if (delay.isNegative()) {
                throw new IllegalArgumentException("Delay cannot be negative, but was " + delay);
            }
            this.maxSize = maxSize;
            this.delay = delay;
            this.defaultWaitMillis = Math.min(delay.toMillis(), 100L);
            this.clock = Objects.requireNonNull(clock);
            this.maintainer = new DaemonThreadFactory("document-operation-executor-" + threadName).newThread(() -> this.maintain(action));
            this.maintainer.start();
        }

        boolean add(Supplier<Result> operation, DocumentOperationExecutor.OperationContext context) {
            if (this.size.incrementAndGet() > this.maxSize) {
                this.size.decrementAndGet();
                return false;
            }
            return this.queue.add(new Delayed(this.clock.instant().plus(this.delay), operation, context));
        }

        long size() {
            return this.size.get();
        }

        Future<?> shutdown(Duration grace, Consumer<DocumentOperationExecutor.OperationContext> onShutdown) {
            ExecutorService shutdownService = Executors.newSingleThreadExecutor();
            Future<Object> future = shutdownService.submit(() -> {
                try {
                    long doomMillis = this.clock.millis() + grace.toMillis();
                    while (this.size.get() > 0L && this.clock.millis() < doomMillis) {
                        Thread.sleep(100L);
                    }
                }
                finally {
                    Delayed delayed;
                    this.maintainer.interrupt();
                    while ((delayed = this.queue.poll()) != null) {
                        this.size.decrementAndGet();
                        onShutdown.accept(delayed.context());
                    }
                }
                return null;
            });
            shutdownService.shutdown();
            return future;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void maintain(BiPredicate<Supplier<Result>, DocumentOperationExecutor.OperationContext> action) {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Instant waitUntil = null;
                    Iterator<Delayed> operations = this.queue.iterator();
                    boolean rejected = false;
                    while (operations.hasNext()) {
                        Delayed delayed = operations.next();
                        if (delayed.context().handled()) {
                            operations.remove();
                            this.size.decrementAndGet();
                            continue;
                        }
                        if (delayed.readyAt().isBefore(this.clock.instant()) && !rejected) {
                            if (action.test(delayed.operation(), delayed.context())) {
                                operations.remove();
                                this.size.decrementAndGet();
                                continue;
                            }
                            waitUntil = this.clock.instant().plus(Duration.ofMillis(10L));
                            rejected = true;
                        }
                        waitUntil = waitUntil != null ? waitUntil : delayed.readyAt();
                    }
                    long waitUntilMillis = waitUntil != null ? waitUntil.toEpochMilli() : this.clock.millis() + this.defaultWaitMillis;
                    DelayQueue delayQueue = this;
                    synchronized (delayQueue) {
                        do {
                            this.notify();
                            this.wait(Math.max(1L, waitUntilMillis - this.clock.millis()));
                        } while (this.clock.millis() < waitUntilMillis);
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                catch (Exception e) {
                    log.log(Level.SEVERE, "Exception caught by delay queue maintainer", e);
                }
            }
        }
    }
}

