/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.confluent.schemaregistry;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.confluent.schema.AntlrProtobufMessageSchemaParser;
import org.apache.nifi.confluent.schema.ProtobufMessageSchema;
import org.apache.nifi.confluent.schemaregistry.VarintUtils;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schemaregistry.services.MessageName;
import org.apache.nifi.schemaregistry.services.MessageNameResolver;
import org.apache.nifi.schemaregistry.services.SchemaDefinition;
import org.apache.nifi.schemaregistry.services.StandardMessageName;

@Tags(value={"confluent", "schema", "registry", "protobuf", "message", "name", "resolver"})
@CapabilityDescription(value="Resolves Protobuf message names from Confluent Schema Registry wire format by decoding message indexes and looking up the fully qualified name in the schema definition\nFor Confluent wire format reference see: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format\n")
public class ConfluentProtobufMessageNameResolver
extends AbstractControllerService
implements MessageNameResolver {
    private static final int MAXIMUM_SUPPORTED_ARRAY_LENGTH = 100;
    private static final int MAXIMUM_CACHE_SIZE = 1000;
    private static final int CACHE_EXPIRE_HOURS = 1;
    private Cache<FindMessageNameArguments, MessageName> messageNameCache;

    @OnEnabled
    public void onEnabled(ConfigurationContext context) {
        this.messageNameCache = Caffeine.newBuilder().maximumSize(1000L).expireAfterWrite(Duration.ofHours(1L)).build();
    }

    @OnDisabled
    public void onDisabled(ConfigurationContext context) {
        if (this.messageNameCache != null) {
            this.messageNameCache.invalidateAll();
            this.messageNameCache = null;
        }
    }

    public MessageName getMessageName(Map<String, String> variables, SchemaDefinition schemaDefinition, InputStream inputStream) throws IOException {
        ComponentLog logger = this.getLogger();
        List<Integer> messageIndexes = this.readMessageIndexesFromStream(inputStream);
        logger.debug("Decoded message indexes: {}", new Object[]{messageIndexes});
        FindMessageNameArguments findMessageNameArgs = new FindMessageNameArguments(schemaDefinition, messageIndexes);
        return (MessageName)this.messageNameCache.get((Object)findMessageNameArgs, this::findMessageName);
    }

    private List<Integer> readMessageIndexesFromStream(InputStream inputStream) throws IOException {
        int firstByte = inputStream.read();
        if (firstByte == -1) {
            throw new IOException("Unexpected end of stream while reading message indexes");
        }
        if (firstByte == 0) {
            return List.of(Integer.valueOf(0));
        }
        int arrayLength = VarintUtils.readVarintFromStreamAfterFirstByteConsumed(inputStream, firstByte);
        if ((arrayLength = VarintUtils.decodeZigZag(arrayLength)) < 0 || arrayLength > 100) {
            throw new IllegalStateException("Invalid message index array length: " + arrayLength);
        }
        ArrayList<Integer> indexes = new ArrayList<Integer>();
        for (int i = 0; i < arrayLength; ++i) {
            int rawIndex = VarintUtils.readVarintFromStream(inputStream);
            int index = VarintUtils.decodeZigZag(rawIndex);
            indexes.add(index);
        }
        return indexes;
    }

    private MessageName findMessageName(FindMessageNameArguments findMessageNameArguments) {
        try {
            List<Integer> messageIndexes = findMessageNameArguments.messageIndexes();
            String schemaText = findMessageNameArguments.schemaDefinition().getText();
            AntlrProtobufMessageSchemaParser reader = new AntlrProtobufMessageSchemaParser();
            List rootMessages = reader.parse(schemaText);
            if (messageIndexes.isEmpty()) {
                if (!rootMessages.isEmpty()) {
                    return this.getFullyQualifiedName(Collections.singletonList((ProtobufMessageSchema)rootMessages.getFirst()));
                }
                throw new IllegalStateException("No root messages found in schema");
            }
            List currentLevel = rootMessages;
            ArrayList<ProtobufMessageSchema> messagePath = new ArrayList<ProtobufMessageSchema>();
            for (int index : messageIndexes) {
                if (index >= currentLevel.size()) {
                    String msg = String.format("Message index %d out of bounds for level with %d messages. Message indexes: [%s]", index, currentLevel.size(), messageIndexes);
                    throw new IllegalStateException(msg);
                }
                ProtobufMessageSchema currentMessage = (ProtobufMessageSchema)currentLevel.get(index);
                messagePath.add(currentMessage);
                currentLevel = currentMessage.getChildMessageSchemas();
            }
            return this.getFullyQualifiedName(messagePath);
        }
        catch (Exception e) {
            throw new IllegalStateException("Failed to parse protobuf schema", e);
        }
    }

    private MessageName getFullyQualifiedName(List<ProtobufMessageSchema> messagePath) {
        ProtobufMessageSchema firstMessage = messagePath.getFirst();
        String fullName = messagePath.stream().map(ProtobufMessageSchema::getName).collect(Collectors.joining("."));
        return new StandardMessageName(firstMessage.getPackageName(), fullName);
    }

    private record FindMessageNameArguments(SchemaDefinition schemaDefinition, List<Integer> messageIndexes) {
    }
}

