/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.opentelemetry.io;

import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
import com.google.protobuf.Message;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.resource.v1.Resource;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.zip.GZIPInputStream;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.opentelemetry.encoding.JsonServiceRequestReader;
import org.apache.nifi.processors.opentelemetry.encoding.ProtobufServiceRequestReader;
import org.apache.nifi.processors.opentelemetry.encoding.ServiceRequestReader;
import org.apache.nifi.processors.opentelemetry.io.RequestContentListener;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceRequestDescription;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceResponse;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceResponseStatus;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryContentEncoding;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryContentType;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryRequestType;

public class StandardRequestContentListener
implements RequestContentListener {
    private static final int ZERO_MESSAGES = 0;
    private static final byte COMPRESSED = 1;
    private static final String CLIENT_SOCKET_ADDRESS = "client.socket.address";
    private static final String CLIENT_SOCKET_PORT = "client.socket.port";
    private final ServiceRequestReader protobufReader = new ProtobufServiceRequestReader();
    private final ServiceRequestReader jsonReader = new JsonServiceRequestReader();
    private final ComponentLog log;
    private final BlockingQueue<Message> messages;

    public StandardRequestContentListener(ComponentLog log, BlockingQueue<Message> messages) {
        this.log = Objects.requireNonNull(log, "Log required");
        this.messages = Objects.requireNonNull(messages, "Messages required");
    }

    @Override
    public ServiceResponse onRequest(ByteBuffer buffer, ServiceRequestDescription serviceRequestDescription) {
        ServiceResponse serviceResponse;
        Objects.requireNonNull(buffer, "Buffer required");
        Objects.requireNonNull(serviceRequestDescription, "Description required");
        InetSocketAddress remoteAddress = serviceRequestDescription.getRemoteAddress();
        TelemetryContentType contentType = serviceRequestDescription.getContentType();
        if (TelemetryContentType.APPLICATION_GRPC == contentType) {
            try {
                byte compression = buffer.get();
                int messageSize = buffer.getInt();
                this.log.debug("Client Address [{}] Content-Type [{}] Message Size [{}] Compression [{}]", new Object[]{remoteAddress, contentType, messageSize, compression});
                TelemetryContentEncoding bufferEncoding = 1 == compression ? TelemetryContentEncoding.GZIP : TelemetryContentEncoding.NONE;
                InputStream decodedStream = this.getDecodedStream(buffer, bufferEncoding);
                serviceResponse = this.onSupportedRequest(decodedStream, serviceRequestDescription);
            }
            catch (Exception e) {
                this.log.warn("Client Address [{}] Content-Type [{}] processing failed", new Object[]{remoteAddress, contentType, e});
                serviceResponse = new ServiceResponse(ServiceResponseStatus.INVALID, 0);
            }
        } else if (TelemetryContentType.APPLICATION_PROTOBUF == contentType || TelemetryContentType.APPLICATION_JSON == contentType) {
            try {
                InputStream decodedStream = this.getDecodedStream(buffer, serviceRequestDescription.getContentEncoding());
                serviceResponse = this.onSupportedRequest(decodedStream, serviceRequestDescription);
            }
            catch (Exception e) {
                this.log.warn("Client Address [{}] Content-Type [{}] processing failed", new Object[]{remoteAddress, contentType, e});
                serviceResponse = new ServiceResponse(ServiceResponseStatus.INVALID, 0);
            }
        } else {
            serviceResponse = new ServiceResponse(ServiceResponseStatus.INVALID, 0);
        }
        return serviceResponse;
    }

    private ServiceResponse onSupportedRequest(InputStream inputStream, ServiceRequestDescription serviceRequestDescription) throws IOException {
        TelemetryContentType contentType = serviceRequestDescription.getContentType();
        ServiceRequestReader serviceRequestReader = TelemetryContentType.APPLICATION_JSON == contentType ? this.jsonReader : this.protobufReader;
        TelemetryRequestType requestType = serviceRequestDescription.getRequestType();
        List<Object> resourceMessages = inputStream.available() == 0 ? Collections.emptyList() : (TelemetryRequestType.LOGS == requestType ? this.readMessages(inputStream, serviceRequestDescription, ExportLogsServiceRequest.class, serviceRequestReader) : (TelemetryRequestType.METRICS == requestType ? this.readMessages(inputStream, serviceRequestDescription, ExportMetricsServiceRequest.class, serviceRequestReader) : (TelemetryRequestType.TRACES == requestType ? this.readMessages(inputStream, serviceRequestDescription, ExportTraceServiceRequest.class, serviceRequestReader) : null)));
        return this.onMessages(resourceMessages);
    }

    private <T extends Message> List<Message> readMessages(InputStream inputStream, ServiceRequestDescription serviceRequestDescription, Class<T> requestType, ServiceRequestReader serviceRequestReader) {
        ArrayList<Message> messages = new ArrayList<Message>();
        List<KeyValue> clientSocketAttributes = this.getClientSocketAttributes(serviceRequestDescription);
        T parsed = serviceRequestReader.read(inputStream, requestType);
        if (parsed instanceof ExportLogsServiceRequest) {
            ExportLogsServiceRequest request = (ExportLogsServiceRequest)parsed;
            for (ResourceLogs resourceLogs : request.getResourceLogsList()) {
                Resource.Builder resource = resourceLogs.getResource().toBuilder();
                resource.addAllAttributes(clientSocketAttributes);
                ResourceLogs message = resourceLogs.toBuilder().setResource(resource).build();
                messages.add((Message)message);
            }
        } else if (parsed instanceof ExportMetricsServiceRequest) {
            ExportMetricsServiceRequest request = (ExportMetricsServiceRequest)parsed;
            for (ResourceMetrics resourceMetrics : request.getResourceMetricsList()) {
                Resource.Builder resource = resourceMetrics.getResource().toBuilder();
                resource.addAllAttributes(clientSocketAttributes);
                ResourceMetrics message = resourceMetrics.toBuilder().setResource(resource).build();
                messages.add((Message)message);
            }
        } else if (parsed instanceof ExportTraceServiceRequest) {
            ExportTraceServiceRequest request = (ExportTraceServiceRequest)parsed;
            for (ResourceSpans resourceSpans : request.getResourceSpansList()) {
                Resource.Builder resource = resourceSpans.getResource().toBuilder();
                resource.addAllAttributes(clientSocketAttributes);
                ResourceSpans message = resourceSpans.toBuilder().setResource(resource).build();
                messages.add((Message)message);
            }
        } else {
            throw new IllegalArgumentException(String.format("Request Type [%s] not supported", requestType.getName()));
        }
        return messages;
    }

    private List<KeyValue> getClientSocketAttributes(ServiceRequestDescription serviceRequestDescription) {
        InetSocketAddress remoteAddress = serviceRequestDescription.getRemoteAddress();
        InetAddress remoteSocketAddress = remoteAddress.getAddress();
        String socketAddress = remoteSocketAddress.getHostAddress();
        int socketPort = remoteAddress.getPort();
        KeyValue clientSocketAddress = KeyValue.newBuilder().setKey(CLIENT_SOCKET_ADDRESS).setValue(AnyValue.newBuilder().setStringValue(socketAddress)).build();
        KeyValue clientSocketPort = KeyValue.newBuilder().setKey(CLIENT_SOCKET_PORT).setValue(AnyValue.newBuilder().setIntValue((long)socketPort)).build();
        return List.of(clientSocketAddress, clientSocketPort);
    }

    private ServiceResponse onMessages(List<? extends Message> resourceMessages) {
        ServiceResponse serviceResponse;
        if (resourceMessages == null) {
            serviceResponse = new ServiceResponse(ServiceResponseStatus.INVALID, 0);
        } else if (resourceMessages.isEmpty()) {
            serviceResponse = new ServiceResponse(ServiceResponseStatus.SUCCESS, 0);
        } else {
            int accepted = 0;
            for (Message message : resourceMessages) {
                if (!this.messages.offer(message)) continue;
                ++accepted;
            }
            int rejected = resourceMessages.size() - accepted;
            serviceResponse = 0 == rejected ? new ServiceResponse(ServiceResponseStatus.SUCCESS, 0) : (0 == accepted ? new ServiceResponse(ServiceResponseStatus.UNAVAILABLE, 0) : new ServiceResponse(ServiceResponseStatus.PARTIAL_SUCCESS, rejected));
        }
        return serviceResponse;
    }

    private InputStream getDecodedStream(ByteBuffer buffer, TelemetryContentEncoding contentEncoding) throws IOException {
        Object decodedStream = TelemetryContentEncoding.GZIP == contentEncoding ? new GZIPInputStream((InputStream)new ByteBufferBackedInputStream(buffer)) : new ByteBufferBackedInputStream(buffer);
        return decodedStream;
    }
}

