/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.queryhandling;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.messaging.GenericMessage;
import org.axonframework.messaging.IllegalPayloadAccessException;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.responsetypes.ResponseType;
import org.axonframework.queryhandling.DefaultSubscriptionQueryResult;
import org.axonframework.queryhandling.GenericQueryMessage;
import org.axonframework.queryhandling.GenericQueryResponseMessage;
import org.axonframework.queryhandling.GenericSubscriptionQueryMessage;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryGateway;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.SubscriptionQueryBackpressure;
import org.axonframework.queryhandling.SubscriptionQueryResult;

public class DefaultQueryGateway
implements QueryGateway {
    private final QueryBus queryBus;
    private final List<MessageDispatchInterceptor<? super QueryMessage<?, ?>>> dispatchInterceptors;

    protected DefaultQueryGateway(Builder builder) {
        builder.validate();
        this.queryBus = builder.queryBus;
        this.dispatchInterceptors = builder.dispatchInterceptors;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override
    public <R, Q> CompletableFuture<R> query(String queryName, Q query, ResponseType<R> responseType) {
        CompletableFuture<QueryResponseMessage<R>> queryResponse = this.queryBus.query(this.processInterceptors(new GenericQueryMessage(GenericMessage.asMessage(query), queryName, responseType)));
        CompletableFuture result = new CompletableFuture();
        ((CompletableFuture)queryResponse.exceptionally(cause -> GenericQueryResponseMessage.asResponseMessage(responseType.responseMessagePayloadType(), cause))).thenAccept(queryResponseMessage -> {
            try {
                if (queryResponseMessage.isExceptional()) {
                    result.completeExceptionally(queryResponseMessage.exceptionResult());
                } else {
                    result.complete(queryResponseMessage.getPayload());
                }
            }
            catch (Exception e) {
                result.completeExceptionally(e);
            }
        });
        return result;
    }

    @Override
    public <R, Q> Stream<R> scatterGather(String queryName, Q query, ResponseType<R> responseType, long timeout, TimeUnit timeUnit) {
        GenericQueryMessage<Q, R> queryMessage = new GenericQueryMessage<Q, R>(query, queryName, responseType);
        return this.queryBus.scatterGather(this.processInterceptors(queryMessage), timeout, timeUnit).map(Message::getPayload);
    }

    @Override
    public <Q, I, U> SubscriptionQueryResult<I, U> subscriptionQuery(String queryName, Q query, ResponseType<I> initialResponseType, ResponseType<U> updateResponseType, SubscriptionQueryBackpressure backpressure, int updateBufferSize) {
        GenericSubscriptionQueryMessage<Q, I, U> subscriptionQueryMessage = new GenericSubscriptionQueryMessage<Q, I, U>(query, queryName, initialResponseType, updateResponseType);
        SubscriptionQueryResult result = this.queryBus.subscriptionQuery(this.processInterceptors(subscriptionQueryMessage), backpressure, updateBufferSize);
        return new DefaultSubscriptionQueryResult(result.initialResult().filter(initialResult -> Objects.nonNull(initialResult.getPayload())).map(Message::getPayload).onErrorMap(e -> e instanceof IllegalPayloadAccessException ? e.getCause() : e), result.updates().filter(update -> Objects.nonNull(update.getPayload())).map(Message::getPayload), result);
    }

    @Override
    public Registration registerDispatchInterceptor(MessageDispatchInterceptor<? super QueryMessage<?, ?>> interceptor) {
        this.dispatchInterceptors.add(interceptor);
        return () -> this.dispatchInterceptors.remove(interceptor);
    }

    private <Q, R, T extends QueryMessage<Q, R>> T processInterceptors(T query) {
        Object intercepted = query;
        for (MessageDispatchInterceptor<QueryMessage<?, ?>> interceptor : this.dispatchInterceptors) {
            intercepted = interceptor.handle((QueryMessage<?, ?>)intercepted);
        }
        return intercepted;
    }

    public static class Builder {
        private QueryBus queryBus;
        private List<MessageDispatchInterceptor<? super QueryMessage<?, ?>>> dispatchInterceptors = new CopyOnWriteArrayList();

        public Builder queryBus(QueryBus queryBus) {
            BuilderUtils.assertNonNull(queryBus, "QueryBus may not be null");
            this.queryBus = queryBus;
            return this;
        }

        public Builder dispatchInterceptors(MessageDispatchInterceptor<? super QueryMessage<?, ?>> ... dispatchInterceptors) {
            return this.dispatchInterceptors(Arrays.asList(dispatchInterceptors));
        }

        public Builder dispatchInterceptors(List<MessageDispatchInterceptor<? super QueryMessage<?, ?>>> dispatchInterceptors) {
            this.dispatchInterceptors = dispatchInterceptors != null && !dispatchInterceptors.isEmpty() ? new CopyOnWriteArrayList(dispatchInterceptors) : new CopyOnWriteArrayList();
            return this;
        }

        public DefaultQueryGateway build() {
            return new DefaultQueryGateway(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull(this.queryBus, "The QueryBus is a hard requirement and should be provided");
        }
    }
}

