/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.query.QuerySerializer;
import org.axonframework.axonserver.connector.query.StreamableResponse;
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;

class StreamableFluxResponse
implements StreamableResponse {
    private final Subscription subscription;

    public <T> StreamableFluxResponse(Flux<QueryResponseMessage<T>> result, ReplyChannel<QueryResponse> responseHandler, QuerySerializer serializer, String requestId, String clientId) {
        SendingSubscriber subscriber = new SendingSubscriber(responseHandler, clientId, requestId);
        this.subscription = subscriber;
        result.map(message -> serializer.serializeResponse((QueryResponseMessage<?>)message, requestId)).subscribeWith((Subscriber)subscriber);
    }

    public void request(long requested) {
        this.subscription.request(requested);
    }

    public void cancel() {
        this.subscription.cancel();
    }

    private static class SendingSubscriber
    extends BaseSubscriber<QueryResponse> {
        private final ReplyChannel<QueryResponse> responseHandler;
        private final String clientId;
        private final String requestId;

        public SendingSubscriber(ReplyChannel<QueryResponse> responseHandler, String clientId, String requestId) {
            this.responseHandler = responseHandler;
            this.clientId = clientId;
            this.requestId = requestId;
        }

        protected void hookOnSubscribe(Subscription subscription) {
        }

        protected void hookOnNext(QueryResponse value) {
            this.responseHandler.send((Object)value);
        }

        protected void hookOnComplete() {
            this.responseHandler.complete();
        }

        protected void hookOnError(Throwable e) {
            ErrorMessage ex = ExceptionSerializer.serialize(this.clientId, e);
            QueryResponse response = QueryResponse.newBuilder().setErrorCode(ErrorCode.getQueryExecutionErrorCode(e).errorCode()).setErrorMessage(ex).setRequestIdentifier(this.requestId).build();
            this.responseHandler.sendLast((Object)response);
        }

        protected void hookOnCancel() {
            this.responseHandler.complete();
        }
    }
}

