/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.webserver.grpc;

import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry;
import io.grpc.KnownLength;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.helidon.common.LazyValue;
import io.helidon.common.buffers.BufferData;
import io.helidon.grpc.core.GrpcHeadersUtil;
import io.helidon.http.Header;
import io.helidon.http.HeaderName;
import io.helidon.http.HeaderNames;
import io.helidon.http.HeaderValues;
import io.helidon.http.Headers;
import io.helidon.http.Status;
import io.helidon.http.WritableHeaders;
import io.helidon.http.http2.Http2ErrorCode;
import io.helidon.http.http2.Http2Flag;
import io.helidon.http.http2.Http2FrameData;
import io.helidon.http.http2.Http2FrameHeader;
import io.helidon.http.http2.Http2FrameTypes;
import io.helidon.http.http2.Http2Headers;
import io.helidon.http.http2.Http2RstStream;
import io.helidon.http.http2.Http2StreamState;
import io.helidon.http.http2.Http2StreamWriter;
import io.helidon.http.http2.Http2WindowUpdate;
import io.helidon.http.http2.StreamFlowControl;
import io.helidon.metrics.api.Counter;
import io.helidon.metrics.api.DistributionSummary;
import io.helidon.metrics.api.Meter;
import io.helidon.metrics.api.MeterRegistry;
import io.helidon.metrics.api.Metrics;
import io.helidon.metrics.api.Tag;
import io.helidon.metrics.api.Timer;
import io.helidon.webserver.grpc.GrpcConfig;
import io.helidon.webserver.grpc.GrpcRouteHandler;
import io.helidon.webserver.grpc.GrpcStatus;
import io.helidon.webserver.http2.spi.Http2SubProtocolSelector;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

class GrpcProtocolHandler<REQ, RES>
implements Http2SubProtocolSelector.SubProtocolHandler {
    private static final System.Logger LOGGER = System.getLogger(GrpcProtocolHandler.class.getName());
    private static final HeaderName GRPC_ENCODING = HeaderNames.create((String)"grpc-encoding");
    private static final HeaderName GRPC_ACCEPT_ENCODING = HeaderNames.create((String)"grpc-accept-encoding");
    private static final Header GRPC_CONTENT_TYPE = HeaderValues.createCached((HeaderName)HeaderNames.CONTENT_TYPE, (String)"application/grpc");
    private static final Header GRPC_ENCODING_IDENTITY = HeaderValues.createCached((HeaderName)GRPC_ENCODING, (String)"identity");
    private static final Http2Flag.DataFlags DATA_FLAGS_ZERO = Http2Flag.DataFlags.create((int)0);
    private static final DecompressorRegistry DECOMPRESSOR_REGISTRY = DecompressorRegistry.getDefaultInstance();
    private static final CompressorRegistry COMPRESSOR_REGISTRY = CompressorRegistry.getDefaultInstance();
    private static final Tag OK_TAG = Tag.create((String)"grpc.status", (String)"OK");
    private static final LazyValue<Map<String, MethodMetrics>> METHOD_METRICS = LazyValue.create(ConcurrentHashMap::new);
    private static final int GRPC_HEADER_SIZE = 5;
    private static final int INITIAL_BUFFER_SIZE = 16384;
    private final Http2Headers headers;
    private final Http2StreamWriter streamWriter;
    private final int streamId;
    private final GrpcRouteHandler<REQ, RES> route;
    private final AtomicInteger numMessages = new AtomicInteger();
    private final LinkedBlockingQueue<REQ> listenerQueue = new LinkedBlockingQueue();
    private final StreamFlowControl flowControl;
    private final GrpcConfig grpcConfig;
    private volatile ServerCall.Listener<REQ> listener;
    private BufferData entityBytes;
    private BufferData readBufferData = BufferData.create((int)16384);
    private BufferData unreadBufferData;
    private long entityBytesLeft;
    private Compressor compressor;
    private Decompressor decompressor;
    private boolean identityCompressor;
    private long bytesReceived;
    private MethodMetrics methodMetrics;
    private long startMillis;
    private volatile boolean callCancelled;
    private final AtomicReference<Http2StreamState> currentStreamState = new AtomicReference();

    GrpcProtocolHandler(Http2Headers headers, Http2StreamWriter streamWriter, int streamId, StreamFlowControl flowControl, Http2StreamState currentStreamState, GrpcRouteHandler<REQ, RES> route, GrpcConfig grpcConfig) {
        this.headers = headers;
        this.streamWriter = streamWriter;
        this.streamId = streamId;
        this.flowControl = flowControl;
        this.currentStreamState.set(currentStreamState);
        this.route = route;
        this.grpcConfig = grpcConfig;
    }

    public void init() {
        try {
            ServerCall<REQ, RES> serverCall = this.createServerCall();
            Headers httpHeaders = this.headers.httpHeaders();
            this.initCompression(serverCall, httpHeaders);
            if (this.grpcConfig.enableMetrics()) {
                this.initMetrics();
                this.startMillis = System.currentTimeMillis();
                this.methodMetrics.callStarted.increment();
            }
            ServerCallHandler<REQ, RES> callHandler = this.route.callHandler();
            this.listener = callHandler.startCall(serverCall, GrpcHeadersUtil.toMetadata((Http2Headers)this.headers));
            this.listener.onReady();
            this.bytesReceived = 0L;
        }
        catch (Throwable e) {
            LOGGER.log(System.Logger.Level.ERROR, "Failed to initialize grpc protocol handler", e);
            throw e;
        }
    }

    public Http2StreamState streamState() {
        return this.currentStreamState.get();
    }

    public void rstStream(Http2RstStream rstStream) {
        this.callCancelled = rstStream.errorCode() == Http2ErrorCode.CANCEL;
        this.listener.onCancel();
        this.currentStreamState.updateAndGet(current -> GrpcProtocolHandler.nextStreamState(current, Http2StreamState.HALF_CLOSED_REMOTE));
    }

    public void windowUpdate(Http2WindowUpdate update) {
    }

    public void data(Http2FrameHeader header, BufferData data) {
        try {
            BufferData newData;
            boolean isCompressed = false;
            if (this.unreadBufferData != null) {
                newData = BufferData.create((BufferData[])new BufferData[]{this.unreadBufferData, data});
                this.unreadBufferData = null;
            } else {
                newData = data;
            }
            while (newData.available() > 0) {
                if (this.entityBytes == null) {
                    if (newData.available() >= 5) {
                        isCompressed = newData.read() == 1;
                        this.entityBytesLeft = newData.readUnsignedInt32();
                        this.entityBytes = this.allocateReadBuffer((int)this.entityBytesLeft);
                    } else {
                        this.unreadBufferData = newData;
                        return;
                    }
                }
                int writableNow = (int)Math.min(this.entityBytesLeft, (long)newData.available());
                this.entityBytes.write(newData, writableNow);
                this.entityBytesLeft -= (long)writableNow;
                if (this.entityBytesLeft != 0L) continue;
                if (isCompressed && this.decompressor == null) {
                    throw new IllegalStateException("Unable to codec for compressed data");
                }
                this.bytesReceived += (long)this.entityBytes.available();
                BufferDataInputStream is = new BufferDataInputStream(this.entityBytes);
                Object request = this.route.method().parseRequest(isCompressed ? this.decompressor.decompress((InputStream)is) : is);
                this.listenerQueue.add(request);
                this.flushQueue();
                this.entityBytes = null;
            }
            if (((Http2Flag.DataFlags)header.flags(Http2FrameTypes.DATA)).endOfStream()) {
                this.listener.onHalfClose();
                this.currentStreamState.updateAndGet(current -> GrpcProtocolHandler.nextStreamState(current, Http2StreamState.HALF_CLOSED_REMOTE));
                if (this.grpcConfig.enableMetrics()) {
                    this.methodMetrics.recvMessageSize.record((double)this.bytesReceived);
                }
            }
        }
        catch (Exception e) {
            this.listener.onCancel();
            LOGGER.log(System.Logger.Level.ERROR, "Failed to process grpc request: " + data.debugDataHex(true), (Throwable)e);
        }
    }

    BufferData allocateReadBuffer(int length) {
        this.readBufferData.reset();
        int capacity = this.readBufferData.capacity();
        if (length > capacity) {
            if (length > this.grpcConfig.maxReadBufferSize()) {
                throw new IllegalStateException("gRPC message size exceeds max read buffer size");
            }
            this.readBufferData = BufferData.create((int)length);
        }
        return this.readBufferData;
    }

    void initCompression(ServerCall<REQ, RES> serverCall, Headers httpHeaders) {
        if (this.grpcConfig.enableCompression()) {
            if (httpHeaders.contains(GRPC_ENCODING)) {
                Header grpcEncoding = httpHeaders.get(GRPC_ENCODING);
                String encoding = (String)grpcEncoding.asString().get();
                this.decompressor = DECOMPRESSOR_REGISTRY.lookupDecompressor(encoding);
                this.compressor = COMPRESSOR_REGISTRY.lookupCompressor(encoding);
                if (this.decompressor == null || this.compressor == null) {
                    Metadata metadata = new Metadata();
                    Set encodings = DECOMPRESSOR_REGISTRY.getAdvertisedMessageEncodings();
                    metadata.put(Metadata.Key.of((String)GRPC_ACCEPT_ENCODING.defaultCase(), (Metadata.AsciiMarshaller)Metadata.ASCII_STRING_MARSHALLER), (Object)String.join((CharSequence)",", encodings));
                    serverCall.close(io.grpc.Status.UNIMPLEMENTED, metadata);
                    this.currentStreamState.updateAndGet(current -> GrpcProtocolHandler.nextStreamState(current, Http2StreamState.CLOSED));
                    return;
                }
            } else if (httpHeaders.contains(GRPC_ACCEPT_ENCODING)) {
                Header acceptEncoding = httpHeaders.get(GRPC_ACCEPT_ENCODING);
                for (String encoding : acceptEncoding.allValues()) {
                    this.compressor = COMPRESSOR_REGISTRY.lookupCompressor(encoding);
                    if (this.compressor == null) continue;
                    this.decompressor = DECOMPRESSOR_REGISTRY.lookupDecompressor(encoding);
                    if (this.decompressor != null) break;
                    this.compressor = null;
                }
            }
        }
        this.identityCompressor = this.compressor == null || this.compressor instanceof Codec.Identity;
    }

    boolean identityCompressor() {
        return this.identityCompressor;
    }

    private void addNumMessages(int n) {
        this.numMessages.getAndAdd(n);
    }

    private void flushQueue() {
        if (this.listener != null) {
            while (!this.listenerQueue.isEmpty() && this.numMessages.getAndDecrement() > 0) {
                this.listener.onMessage(this.listenerQueue.poll());
            }
        }
    }

    static Http2StreamState nextStreamState(Http2StreamState currentStreamState, Http2StreamState desiredStreamState) {
        return switch (desiredStreamState) {
            case Http2StreamState.HALF_CLOSED_LOCAL -> {
                if (currentStreamState == Http2StreamState.HALF_CLOSED_REMOTE) {
                    yield Http2StreamState.CLOSED;
                }
                yield Http2StreamState.HALF_CLOSED_LOCAL;
            }
            case Http2StreamState.HALF_CLOSED_REMOTE -> {
                if (currentStreamState == Http2StreamState.HALF_CLOSED_LOCAL) {
                    yield Http2StreamState.CLOSED;
                }
                yield Http2StreamState.HALF_CLOSED_REMOTE;
            }
            default -> desiredStreamState;
        };
    }

    private ServerCall<REQ, RES> createServerCall() {
        return new ServerCall<REQ, RES>(){
            private long bytesSent;
            private boolean headersSent;
            private BufferData writeBufferData = BufferData.growing((int)16384);

            public void request(int numMessages) {
                GrpcProtocolHandler.this.addNumMessages(numMessages);
                GrpcProtocolHandler.this.flushQueue();
            }

            public void sendHeaders(Metadata headers) {
                WritableHeaders writable = WritableHeaders.create();
                GrpcHeadersUtil.updateHeaders((WritableHeaders)writable, (Metadata)headers);
                writable.set(GRPC_CONTENT_TYPE);
                if (GrpcProtocolHandler.this.compressor == null) {
                    writable.set(GRPC_ENCODING_IDENTITY);
                } else {
                    writable.set(HeaderValues.createCached((HeaderName)GRPC_ENCODING, (String)GrpcProtocolHandler.this.compressor.getMessageEncoding()));
                }
                Http2Headers http2Headers = Http2Headers.create((WritableHeaders)writable);
                http2Headers.status(Status.OK_200);
                GrpcProtocolHandler.this.streamWriter.writeHeaders(http2Headers, GrpcProtocolHandler.this.streamId, Http2Flag.HeaderFlags.create((int)4), GrpcProtocolHandler.this.flowControl.outbound());
                this.headersSent = true;
            }

            public void sendMessage(RES message) {
                try (InputStream inputStream = GrpcProtocolHandler.this.route.method().streamResponse(message);){
                    BufferData bufferData;
                    if (GrpcProtocolHandler.this.identityCompressor && inputStream instanceof KnownLength) {
                        KnownLength knownLength = (KnownLength)inputStream;
                        int bytesLength = knownLength.available();
                        bufferData = this.allocateWriteBuffer(5 + bytesLength);
                        bufferData.write(0);
                        bufferData.writeUnsignedInt32((long)bytesLength);
                        bufferData.readFrom(inputStream);
                    } else {
                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
                        if (GrpcProtocolHandler.this.identityCompressor) {
                            inputStream.transferTo(baos);
                        } else {
                            try (OutputStream os = GrpcProtocolHandler.this.compressor.compress((OutputStream)baos);){
                                inputStream.transferTo(os);
                            }
                        }
                        byte[] bytes = baos.toByteArray();
                        bufferData = this.allocateWriteBuffer(5 + bytes.length);
                        bufferData.write(GrpcProtocolHandler.this.identityCompressor ? 0 : 1);
                        bufferData.writeUnsignedInt32((long)bytes.length);
                        bufferData.write(bytes);
                    }
                    int writeLength = bufferData.available();
                    Http2FrameHeader header = Http2FrameHeader.create((int)writeLength, (Http2FrameTypes)Http2FrameTypes.DATA, (Http2Flag)DATA_FLAGS_ZERO, (int)GrpcProtocolHandler.this.streamId);
                    GrpcProtocolHandler.this.streamWriter.writeData(new Http2FrameData(header, bufferData), GrpcProtocolHandler.this.flowControl.outbound());
                    this.bytesSent += (long)writeLength;
                }
                catch (IOException e) {
                    GrpcProtocolHandler.this.listener.onCancel();
                    LOGGER.log(System.Logger.Level.ERROR, "Failed to respond to grpc request: " + String.valueOf(GrpcProtocolHandler.this.route.method()), (Throwable)e);
                }
            }

            public void close(io.grpc.Status status, Metadata trailers) {
                WritableHeaders writable = WritableHeaders.create();
                if (!this.headersSent) {
                    writable.set(GRPC_CONTENT_TYPE);
                }
                GrpcHeadersUtil.updateHeaders((WritableHeaders)writable, (Metadata)trailers);
                int statusValue = GrpcProtocolHandler.this.callCancelled ? io.grpc.Status.CANCELLED.getCode().value() : status.getCode().value();
                writable.set(HeaderValues.create((HeaderName)GrpcStatus.STATUS_NAME, (int)statusValue));
                String description = status.getDescription();
                if (description != null) {
                    writable.set(HeaderValues.create((HeaderName)GrpcStatus.MESSAGE_NAME, (String)description));
                }
                Http2Headers http2Headers = Http2Headers.create((WritableHeaders)writable);
                if (!this.headersSent) {
                    http2Headers.status(Status.OK_200);
                }
                GrpcProtocolHandler.this.streamWriter.writeHeaders(http2Headers, GrpcProtocolHandler.this.streamId, Http2Flag.HeaderFlags.create((int)5), GrpcProtocolHandler.this.flowControl.outbound());
                GrpcProtocolHandler.this.currentStreamState.updateAndGet(current -> GrpcProtocolHandler.nextStreamState(current, Http2StreamState.HALF_CLOSED_LOCAL));
                if (!GrpcProtocolHandler.this.callCancelled) {
                    GrpcProtocolHandler.this.listener.onComplete();
                }
                if (status.isOk() && GrpcProtocolHandler.this.grpcConfig.enableMetrics()) {
                    GrpcProtocolHandler.this.methodMetrics.sentMessageSize.record((double)this.bytesSent);
                    GrpcProtocolHandler.this.methodMetrics.callDuration.record(Duration.ofMillis(System.currentTimeMillis() - GrpcProtocolHandler.this.startMillis));
                }
            }

            public boolean isCancelled() {
                return GrpcProtocolHandler.this.currentStreamState.get() == Http2StreamState.CLOSED;
            }

            public MethodDescriptor<REQ, RES> getMethodDescriptor() {
                return GrpcProtocolHandler.this.route.method();
            }

            private BufferData allocateWriteBuffer(int length) {
                this.writeBufferData.reset();
                int capacity = this.writeBufferData.capacity();
                if (length > capacity) {
                    this.writeBufferData = BufferData.create((int)length);
                }
                return this.writeBufferData;
            }
        };
    }

    private void initMetrics() {
        String methodName = this.route.method().getFullMethodName();
        this.methodMetrics = ((Map)METHOD_METRICS.get()).computeIfAbsent(methodName, name -> {
            MeterRegistry meterRegistry = Metrics.globalRegistry();
            Tag grpcMethod = Tag.create((String)"grpc.method", (String)name);
            Counter.Builder callStartedBuilder = (Counter.Builder)((Counter.Builder)Counter.builder((String)"grpc.server.call.started").scope("vendor")).tags(List.of(grpcMethod));
            Counter callStarted = (Counter)meterRegistry.getOrCreate((Meter.Builder)callStartedBuilder);
            Timer.Builder callDurationOkBuilder = (Timer.Builder)((Timer.Builder)((Timer.Builder)Timer.builder((String)"grpc.server.call.duration").scope("vendor")).baseUnit("milliseconds")).tags(List.of(grpcMethod, OK_TAG));
            Timer callDuration = (Timer)meterRegistry.getOrCreate((Meter.Builder)callDurationOkBuilder);
            DistributionSummary.Builder sendMessageSizeBuilder = (DistributionSummary.Builder)((DistributionSummary.Builder)DistributionSummary.builder((String)"grpc.server.call.sent_total_compressed_message_size").scope("vendor")).tags(List.of(grpcMethod, OK_TAG));
            DistributionSummary sentMessageSize = (DistributionSummary)meterRegistry.getOrCreate((Meter.Builder)sendMessageSizeBuilder);
            DistributionSummary.Builder recvMessageSizeBuilder = (DistributionSummary.Builder)((DistributionSummary.Builder)DistributionSummary.builder((String)"grpc.server.call.rcvd_total_compressed_message_size").scope("vendor")).tags(List.of(grpcMethod, OK_TAG));
            DistributionSummary recvMessageSize = (DistributionSummary)meterRegistry.getOrCreate((Meter.Builder)recvMessageSizeBuilder);
            return new MethodMetrics(callStarted, callDuration, sentMessageSize, recvMessageSize);
        });
    }

    private record MethodMetrics(Counter callStarted, Timer callDuration, DistributionSummary sentMessageSize, DistributionSummary recvMessageSize) {
    }

    static class BufferDataInputStream
    extends InputStream
    implements KnownLength {
        private final BufferData bufferData;

        BufferDataInputStream(BufferData bufferData) {
            this.bufferData = bufferData;
        }

        @Override
        public int read() {
            return this.bufferData.read();
        }

        @Override
        public int read(byte[] b) {
            return this.bufferData.read(b);
        }

        @Override
        public int read(byte[] b, int off, int len) {
            return this.bufferData.read(b, off, len);
        }

        @Override
        public int available() {
            return this.bufferData.available();
        }
    }
}

