/*
 * Decompiled with CFR 0.152.
 */
package org.apache.arrow.flight;

import io.grpc.Status;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.netty.buffer.ArrowBuf;
import java.util.concurrent.ExecutorService;
import java.util.function.BooleanSupplier;
import org.apache.arrow.flight.Action;
import org.apache.arrow.flight.ArrowMessage;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.DictionaryUtils;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.SchemaResult;
import org.apache.arrow.flight.StreamPipe;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.auth.AuthConstants;
import org.apache.arrow.flight.auth.ServerAuthHandler;
import org.apache.arrow.flight.auth.ServerAuthWrapper;
import org.apache.arrow.flight.grpc.StatusUtils;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.FlightServiceGrpc;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FlightService
extends FlightServiceGrpc.FlightServiceImplBase {
    private static final Logger logger = LoggerFactory.getLogger(FlightService.class);
    private static final int PENDING_REQUESTS = 5;
    private final BufferAllocator allocator;
    private final FlightProducer producer;
    private final ServerAuthHandler authHandler;
    private final ExecutorService executors;

    FlightService(BufferAllocator allocator, FlightProducer producer, ServerAuthHandler authHandler, ExecutorService executors) {
        this.allocator = allocator;
        this.producer = producer;
        this.authHandler = authHandler;
        this.executors = executors;
    }

    @Override
    public StreamObserver<Flight.HandshakeRequest> handshake(StreamObserver<Flight.HandshakeResponse> responseObserver) {
        return ServerAuthWrapper.wrapHandshake(this.authHandler, responseObserver, this.executors);
    }

    @Override
    public void listFlights(Flight.Criteria criteria, StreamObserver<Flight.FlightInfo> responseObserver) {
        try {
            this.producer.listFlights(this.makeContext((ServerCallStreamObserver)responseObserver), new Criteria(criteria), StreamPipe.wrap(responseObserver, FlightInfo::toProtocol));
        }
        catch (Exception ex) {
            responseObserver.onError(StatusUtils.toGrpcException(ex));
        }
    }

    private CallContext makeContext(ServerCallStreamObserver<?> responseObserver) {
        return new CallContext((String)AuthConstants.PEER_IDENTITY_KEY.get(), () -> responseObserver.isCancelled());
    }

    public void doGetCustom(Flight.Ticket ticket, StreamObserver<ArrowMessage> responseObserver) {
        try {
            this.producer.getStream(this.makeContext((ServerCallStreamObserver)responseObserver), new Ticket(ticket), new GetListener(responseObserver));
        }
        catch (Exception ex) {
            responseObserver.onError(StatusUtils.toGrpcException(ex));
        }
    }

    @Override
    public void doAction(Flight.Action request, StreamObserver<Flight.Result> responseObserver) {
        try {
            this.producer.doAction(this.makeContext((ServerCallStreamObserver)responseObserver), new Action(request), StreamPipe.wrap(responseObserver, Result::toProtocol));
        }
        catch (Exception ex) {
            responseObserver.onError(StatusUtils.toGrpcException(ex));
        }
    }

    @Override
    public void listActions(Flight.Empty request, StreamObserver<Flight.ActionType> responseObserver) {
        try {
            this.producer.listActions(this.makeContext((ServerCallStreamObserver)responseObserver), StreamPipe.wrap(responseObserver, t -> t.toProtocol()));
        }
        catch (Exception ex) {
            responseObserver.onError(StatusUtils.toGrpcException(ex));
        }
    }

    public StreamObserver<ArrowMessage> doPutCustom(StreamObserver<Flight.PutResult> responseObserverSimple) {
        ServerCallStreamObserver responseObserver = (ServerCallStreamObserver)responseObserverSimple;
        responseObserver.disableAutoInboundFlowControl();
        responseObserver.request(1);
        FlightStream fs = new FlightStream(this.allocator, 5, (message, cause) -> responseObserver.onError((Throwable)Status.CANCELLED.withCause(cause).withDescription(message).asException()), arg_0 -> ((ServerCallStreamObserver)responseObserver).request(arg_0));
        this.executors.submit(() -> {
            StreamPipe<PutResult, Flight.PutResult> ackStream = StreamPipe.wrap(responseObserver, PutResult::toProtocol);
            try {
                this.producer.acceptPut(this.makeContext(responseObserver), fs, ackStream).run();
            }
            catch (Exception ex) {
                ackStream.onError(StatusUtils.toGrpcException(ex));
                logger.error("Exception handling DoPut", (Throwable)ex);
            }
            finally {
                ackStream.ensureCompleted();
                try {
                    fs.close();
                }
                catch (Exception e) {
                    logger.error("Exception closing Flight stream", (Throwable)e);
                }
            }
        });
        return fs.asObserver();
    }

    @Override
    public void getFlightInfo(Flight.FlightDescriptor request, StreamObserver<Flight.FlightInfo> responseObserver) {
        try {
            FlightInfo info = this.producer.getFlightInfo(this.makeContext((ServerCallStreamObserver)responseObserver), new FlightDescriptor(request));
            responseObserver.onNext((Object)info.toProtocol());
            responseObserver.onCompleted();
        }
        catch (Exception ex) {
            responseObserver.onError(StatusUtils.toGrpcException(ex));
        }
    }

    @Override
    public void getSchema(Flight.FlightDescriptor request, StreamObserver<Flight.SchemaResult> responseObserver) {
        try {
            SchemaResult result = this.producer.getSchema(this.makeContext((ServerCallStreamObserver)responseObserver), new FlightDescriptor(request));
            responseObserver.onNext((Object)result.toProtocol());
            responseObserver.onCompleted();
        }
        catch (Exception ex) {
            responseObserver.onError(StatusUtils.toGrpcException(ex));
        }
    }

    static class CallContext
    implements FlightProducer.CallContext {
        private final String peerIdentity;
        private final BooleanSupplier isCancelled;

        CallContext(String peerIdentity, BooleanSupplier isCancelled) {
            this.peerIdentity = peerIdentity;
            this.isCancelled = isCancelled;
        }

        @Override
        public String peerIdentity() {
            return this.peerIdentity;
        }

        @Override
        public boolean isCancelled() {
            return this.isCancelled.getAsBoolean();
        }
    }

    private static class GetListener
    implements FlightProducer.ServerStreamListener {
        private ServerCallStreamObserver<ArrowMessage> responseObserver;
        private volatile VectorUnloader unloader;

        public GetListener(StreamObserver<ArrowMessage> responseObserver) {
            this.responseObserver = (ServerCallStreamObserver)responseObserver;
            this.responseObserver.setOnCancelHandler(() -> this.onCancel());
            this.responseObserver.disableAutoInboundFlowControl();
        }

        private void onCancel() {
            logger.debug("Stream cancelled by client.");
        }

        @Override
        public boolean isReady() {
            return this.responseObserver.isReady();
        }

        @Override
        public boolean isCancelled() {
            return this.responseObserver.isCancelled();
        }

        @Override
        public void start(VectorSchemaRoot root) {
            this.start(root, (DictionaryProvider)new DictionaryProvider.MapDictionaryProvider(new Dictionary[0]));
        }

        @Override
        public void start(VectorSchemaRoot root, DictionaryProvider provider) {
            this.unloader = new VectorUnloader(root, true, true);
            DictionaryUtils.generateSchemaMessages(root.getSchema(), null, provider, arg_0 -> this.responseObserver.onNext(arg_0));
        }

        @Override
        public void putNext() {
            this.putNext(null);
        }

        @Override
        public void putNext(ArrowBuf metadata) {
            Preconditions.checkNotNull((Object)this.unloader);
            this.responseObserver.onNext((Object)new ArrowMessage(this.unloader.getRecordBatch(), metadata));
        }

        @Override
        public void error(Throwable ex) {
            this.responseObserver.onError(StatusUtils.toGrpcException(ex));
        }

        @Override
        public void completed() {
            this.responseObserver.onCompleted();
        }
    }
}

