/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.opentelemetry.io;

import com.google.protobuf.Message;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.processors.opentelemetry.io.RequestCallback;
import org.apache.nifi.processors.opentelemetry.io.StandardRequestCallback;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryRequestType;

public class RequestCallbackProvider
implements Iterator<RequestCallback> {
    private static final String EMPTY_ELEMENT = null;
    private final URI transitBaseUri;
    private final int batchSize;
    private final BlockingQueue<Message> messages;

    public RequestCallbackProvider(URI transitBaseUri, int batchSize, BlockingQueue<Message> messages) {
        this.transitBaseUri = Objects.requireNonNull(transitBaseUri, "Transit Base URI required");
        this.batchSize = batchSize;
        this.messages = Objects.requireNonNull(messages, "Messages required");
    }

    @Override
    public boolean hasNext() {
        return !this.messages.isEmpty();
    }

    @Override
    public RequestCallback next() {
        Message head = (Message)this.messages.element();
        Class headMessageClass = head.getClass();
        TelemetryRequestType requestType = this.getRequestType(headMessageClass);
        String transitUri = this.getTransitUri(requestType);
        List<Message> requestMessages = this.getRequestMessages(headMessageClass);
        return new StandardRequestCallback(requestType, headMessageClass, requestMessages, transitUri);
    }

    private TelemetryRequestType getRequestType(Class<?> headMessageClass) {
        TelemetryRequestType requestType;
        if (ResourceLogs.class.isAssignableFrom(headMessageClass)) {
            requestType = TelemetryRequestType.LOGS;
        } else if (ResourceMetrics.class.isAssignableFrom(headMessageClass)) {
            requestType = TelemetryRequestType.METRICS;
        } else if (ResourceSpans.class.isAssignableFrom(headMessageClass)) {
            requestType = TelemetryRequestType.TRACES;
        } else {
            throw new IllegalArgumentException(String.format("Request Class [%s] not supported", headMessageClass));
        }
        return requestType;
    }

    private String getTransitUri(TelemetryRequestType requestType) {
        try {
            URI uri = new URI(this.transitBaseUri.getScheme(), EMPTY_ELEMENT, this.transitBaseUri.getHost(), this.transitBaseUri.getPort(), requestType.getPath(), EMPTY_ELEMENT, EMPTY_ELEMENT);
            return uri.toString();
        }
        catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }

    private List<Message> getRequestMessages(Class<?> headMessageClass) {
        ArrayList<Message> requestMessages = new ArrayList<Message>(this.batchSize);
        Iterator currentMessages = this.messages.iterator();
        while (currentMessages.hasNext()) {
            Message message = (Message)currentMessages.next();
            Class messageClass = message.getClass();
            if (!headMessageClass.equals(messageClass)) continue;
            requestMessages.add(message);
            currentMessages.remove();
            if (requestMessages.size() != this.batchSize) continue;
            break;
        }
        return requestMessages;
    }
}

