/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.gcp.pubsub;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.pathtemplate.ValidationException;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.iam.v1.TestIamPermissionsRequest;
import com.google.iam.v1.TestIamPermissionsResponse;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
import org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub;
import org.apache.nifi.processors.gcp.pubsub.consume.AbstractPubSubMessageConverter;
import org.apache.nifi.processors.gcp.pubsub.consume.OutputStrategy;
import org.apache.nifi.processors.gcp.pubsub.consume.ProcessingStrategy;
import org.apache.nifi.processors.gcp.pubsub.consume.RecordStreamPubSubMessageConverter;
import org.apache.nifi.processors.gcp.pubsub.consume.WrapperRecordStreamPubSubMessageConverter;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;

@SeeAlso(value={PublishGCPubSub.class})
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"google", "google-cloud", "gcp", "message", "pubsub", "consume"})
@CapabilityDescription(value=" Consumes messages from the configured Google Cloud PubSub subscription. The 'Batch Size' property specified the maximum\n number of messages that will be pulled from the subscription in a single request. The 'Processing Strategy' property\n specifies if each message should be its own FlowFile or if messages should be grouped into a single FlowFile. Using the\n Demarcator strategy will provide best throughput when the format allows it. Using Record allows to convert data format\n as well as doing schema enforcement. Using the FlowFile strategy will generate one FlowFile per message and will have\n the message's attributes as FlowFile attributes.\n")
@WritesAttributes(value={@WritesAttribute(attribute="gcp.pubsub.ackId", description="Acknowledgement Id of the consumed Google Cloud PubSub message"), @WritesAttribute(attribute="gcp.pubsub.messageSize", description="Serialized size of the consumed Google Cloud PubSub message"), @WritesAttribute(attribute="gcp.pubsub.attributesCount", description="Number of attributes the consumed PubSub message has, if any"), @WritesAttribute(attribute="gcp.pubsub.publishTime", description="Timestamp value when the message was published"), @WritesAttribute(attribute="gcp.pubsub.subscription", description="Name of the PubSub subscription"), @WritesAttribute(attribute="Dynamic Attributes", description="Other than the listed attributes, this processor may write zero or more attributes, if the original Google Cloud Publisher client added any attributes to the message while sending")})
public class ConsumeGCPubSub
extends AbstractGCPubSubProcessor {
    private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("pubsub.subscriptions.consume");
    public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder().name("Subscription").addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).description("Name of the Google Cloud Pub/Sub Subscription").required(true).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    public static final PropertyDescriptor PROCESSING_STRATEGY = new PropertyDescriptor.Builder().name("Processing Strategy").description("Strategy for processing PubSub Records and writing serialized output to FlowFiles").required(true).allowableValues(ProcessingStrategy.class).defaultValue(ProcessingStrategy.FLOW_FILE.getValue()).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("Record Reader").description("The Record Reader to use for incoming messages").identifiesControllerService(RecordReaderFactory.class).required(true).dependsOn(PROCESSING_STRATEGY, (DescribedValue)ProcessingStrategy.RECORD, new DescribedValue[0]).build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("Record Writer").description("The Record Writer to use in order to serialize the outgoing FlowFiles").identifiesControllerService(RecordSetWriterFactory.class).required(true).dependsOn(PROCESSING_STRATEGY, (DescribedValue)ProcessingStrategy.RECORD, new DescribedValue[0]).build();
    public static final PropertyDescriptor OUTPUT_STRATEGY = new PropertyDescriptor.Builder().name("Output Strategy").description("The format used to output the Kafka Record into a FlowFile Record.").required(true).defaultValue((DescribedValue)OutputStrategy.USE_VALUE).allowableValues(OutputStrategy.class).dependsOn(PROCESSING_STRATEGY, (DescribedValue)ProcessingStrategy.RECORD, new DescribedValue[0]).build();
    public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder().name("Message Demarcator").required(true).addValidator(Validator.VALID).description("Since the PubSub client receives messages in batches, this Processor has an option to output FlowFiles\nwhich contains all the messages in a single batch. This property allows you to provide a string\n(interpreted as UTF-8) to use for demarcating apart multiple messages. To enter special character\nsuch as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.\n").dependsOn(PROCESSING_STRATEGY, (DescribedValue)ProcessingStrategy.DEMARCATOR, new DescribedValue[0]).build();
    public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder().name("parse failure").description("If configured to use a Record Reader, a PubSub message that cannot be parsed using the configured Record Reader will be routed to this relationship").build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(GCP_CREDENTIALS_PROVIDER_SERVICE, PROJECT_ID, SUBSCRIPTION, BATCH_SIZE_THRESHOLD, PROCESSING_STRATEGY, RECORD_READER, RECORD_WRITER, OUTPUT_STRATEGY, MESSAGE_DEMARCATOR, API_ENDPOINT, PROXY_CONFIGURATION_SERVICE);
    private static final Set<Relationship> SUCCESS_RELATIONSHIP = Set.of(REL_SUCCESS);
    private static final Set<Relationship> SUCCESS_FAILURE_RELATIONSHIPS = Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
    protected SubscriberStub subscriber = null;
    private PullRequest pullRequest;
    protected volatile OutputStrategy outputStrategy;
    protected volatile ProcessingStrategy processingStrategy;
    private volatile boolean useReader;
    protected volatile String demarcatorValue;
    private final AtomicReference<Exception> storedException = new AtomicReference();

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.useReader ? SUCCESS_FAILURE_RELATIONSHIPS : SUCCESS_RELATIONSHIP;
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (descriptor.equals((Object)RECORD_READER)) {
            this.useReader = newValue != null;
        }
    }

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        Integer batchSize = context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
        this.pullRequest = PullRequest.newBuilder().setMaxMessages(batchSize.intValue()).setSubscription(this.getSubscriptionName(context, null)).build();
        try {
            this.subscriber = this.getSubscriber(context);
        }
        catch (IOException e) {
            this.storedException.set(e);
            this.getLogger().error("Failed to create Google Cloud Subscriber", (Throwable)e);
        }
        this.processingStrategy = (ProcessingStrategy)context.getProperty(PROCESSING_STRATEGY).asAllowableValue(ProcessingStrategy.class);
        this.outputStrategy = this.processingStrategy == ProcessingStrategy.RECORD ? (OutputStrategy)context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class) : null;
        this.demarcatorValue = this.processingStrategy == ProcessingStrategy.DEMARCATOR ? context.getProperty(MESSAGE_DEMARCATOR).getValue() : null;
    }

    public List<ConfigVerificationResult> verify(ProcessContext context, ComponentLog verificationLogger, Map<String, String> attributes) {
        ArrayList<ConfigVerificationResult> results = new ArrayList<ConfigVerificationResult>();
        String subscriptionName = null;
        try {
            subscriptionName = this.getSubscriptionName(context, attributes);
            results.add(new ConfigVerificationResult.Builder().verificationStepName("Parse Subscription Name").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Successfully parsed Subscription Name").build());
        }
        catch (ValidationException e) {
            verificationLogger.error("Failed to parse Subscription Name", (Throwable)e);
            results.add(new ConfigVerificationResult.Builder().verificationStepName("Parse Subscription Name").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("Failed to parse Subscription Name: " + e.getMessage(), new Object[0])).build());
        }
        SubscriberStub subscriber = null;
        try {
            subscriber = this.getSubscriber(context);
            results.add(new ConfigVerificationResult.Builder().verificationStepName("Create Subscriber").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Successfully created Subscriber").build());
        }
        catch (IOException e) {
            verificationLogger.error("Failed to create Subscriber", (Throwable)e);
            results.add(new ConfigVerificationResult.Builder().verificationStepName("Create Subscriber").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("Failed to create Subscriber: " + e.getMessage(), new Object[0])).build());
        }
        if (subscriber != null && subscriptionName != null) {
            try {
                TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder().addAllPermissions(REQUIRED_PERMISSIONS).setResource(subscriptionName).build();
                if (((TestIamPermissionsResponse)subscriber.testIamPermissionsCallable().call((Object)request)).getPermissionsCount() >= REQUIRED_PERMISSIONS.size()) {
                    results.add(new ConfigVerificationResult.Builder().verificationStepName("Test IAM Permissions").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(String.format("Verified Subscription [%s] exists and the configured user has the correct permissions.", subscriptionName)).build());
                } else {
                    results.add(new ConfigVerificationResult.Builder().verificationStepName("Test IAM Permissions").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("The configured user does not have the correct permissions on Subscription [%s].", subscriptionName)).build());
                }
            }
            catch (ApiException e) {
                verificationLogger.error("The configured user appears to have the correct permissions, but the following error was encountered", (Throwable)e);
                results.add(new ConfigVerificationResult.Builder().verificationStepName("Test IAM Permissions").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("The configured user appears to have the correct permissions, but the following error was encountered: " + e.getMessage(), new Object[0])).build());
            }
        }
        return results;
    }

    @OnStopped
    public void onStopped() {
        if (this.subscriber != null) {
            this.subscriber.shutdown();
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        if (this.subscriber == null) {
            if (this.storedException.get() != null) {
                this.getLogger().error("Failed to create Google Cloud PubSub subscriber due to {}", (Throwable)this.storedException.get());
            } else {
                this.getLogger().error("Google Cloud PubSub Subscriber was not properly created. Yielding the processor...");
            }
            context.yield();
            return;
        }
        PullResponse pullResponse = (PullResponse)this.subscriber.pullCallable().call((Object)this.pullRequest);
        ArrayList<String> ackIds = new ArrayList<String>();
        String subscriptionName = this.getSubscriptionName(context, null);
        List receivedMessages = pullResponse.getReceivedMessagesList();
        switch (this.processingStrategy) {
            case RECORD: {
                this.processInputRecords(context, session, receivedMessages, subscriptionName, ackIds);
                break;
            }
            case FLOW_FILE: {
                this.processInputFlowFile(session, receivedMessages, subscriptionName, ackIds);
                break;
            }
            case DEMARCATOR: {
                this.processInputDemarcator(session, receivedMessages, subscriptionName, ackIds);
            }
        }
        session.commitAsync(() -> this.acknowledgeAcks(ackIds, subscriptionName));
    }

    @Override
    public void migrateProperties(PropertyConfiguration config) {
        super.migrateProperties(config);
        config.renameProperty("gcp-pubsub-subscription", SUBSCRIPTION.getName());
    }

    private void processInputDemarcator(ProcessSession session, final List<ReceivedMessage> receivedMessages, String subscriptionName, final List<String> ackIds) {
        final byte[] demarcator = this.demarcatorValue == null ? new byte[]{} : this.demarcatorValue.getBytes(StandardCharsets.UTF_8);
        FlowFile flowFile = session.create();
        try {
            flowFile = session.write(flowFile, new OutputStreamCallback(){

                public void process(OutputStream out) throws IOException {
                    for (ReceivedMessage message : receivedMessages) {
                        if (!message.hasMessage()) continue;
                        out.write(message.getMessage().getData().toByteArray());
                        out.write(demarcator);
                        ackIds.add(message.getAckId());
                    }
                }
            });
            session.putAttribute(flowFile, "gcp.pubsub.subscription", subscriptionName);
        }
        catch (Exception e) {
            ackIds.clear();
            session.remove(flowFile);
            throw new ProcessException("Failed to write batch of messages in FlowFile", (Throwable)e);
        }
        if (flowFile.getSize() > 0L) {
            session.putAttribute(flowFile, "record.count", String.valueOf(ackIds.size()));
            session.transfer(flowFile, REL_SUCCESS);
            session.getProvenanceReporter().receive(flowFile, subscriptionName);
            session.adjustCounter("Records Received from " + subscriptionName, (long)ackIds.size(), false);
        } else {
            session.remove(flowFile);
        }
    }

    private void processInputFlowFile(ProcessSession session, List<ReceivedMessage> receivedMessages, String subscriptionName, List<String> ackIds) {
        for (ReceivedMessage message : receivedMessages) {
            if (!message.hasMessage()) continue;
            FlowFile flowFile = session.create();
            HashMap<String, String> attributes = new HashMap<String, String>();
            attributes.put("gcp.pubsub.ackId", message.getAckId());
            attributes.put("gcp.pubsub.messageSize", String.valueOf(message.getSerializedSize()));
            attributes.put("gcp.pubsub.messageId", message.getMessage().getMessageId());
            attributes.put("gcp.pubsub.attributesCount", String.valueOf(message.getMessage().getAttributesCount()));
            attributes.put("gcp.pubsub.publishTime", String.valueOf(message.getMessage().getPublishTime().getSeconds()));
            attributes.put("gcp.pubsub.subscription", subscriptionName);
            attributes.putAll(message.getMessage().getAttributesMap());
            flowFile = session.putAllAttributes(flowFile, attributes);
            flowFile = session.write(flowFile, out -> out.write(message.getMessage().getData().toByteArray()));
            ackIds.add(message.getAckId());
            session.transfer(flowFile, REL_SUCCESS);
            session.getProvenanceReporter().receive(flowFile, subscriptionName);
        }
    }

    private void processInputRecords(ProcessContext context, ProcessSession session, List<ReceivedMessage> receivedMessages, String subscriptionName, List<String> ackIds) {
        RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        AbstractPubSubMessageConverter converter = switch (this.outputStrategy) {
            default -> throw new MatchException(null, null);
            case OutputStrategy.USE_VALUE -> new RecordStreamPubSubMessageConverter(readerFactory, writerFactory, this.getLogger());
            case OutputStrategy.USE_WRAPPER -> new WrapperRecordStreamPubSubMessageConverter(readerFactory, writerFactory, this.getLogger());
        };
        converter.toFlowFiles(session, receivedMessages, ackIds, subscriptionName);
    }

    private void acknowledgeAcks(Collection<String> ackIds, String subscriptionName) {
        if (ackIds == null || ackIds.isEmpty()) {
            return;
        }
        AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder().addAllAckIds(ackIds).setSubscription(subscriptionName).build();
        this.subscriber.acknowledgeCallable().call((Object)acknowledgeRequest);
    }

    private String getSubscriptionName(ProcessContext context, Map<String, String> additionalAttributes) {
        String subscriptionName = context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions(additionalAttributes).getValue();
        String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions(additionalAttributes).getValue();
        if (subscriptionName.contains("/")) {
            return ProjectSubscriptionName.parse((String)subscriptionName).toString();
        }
        return ProjectSubscriptionName.of((String)projectId, (String)subscriptionName).toString();
    }

    protected SubscriberStub getSubscriber(ProcessContext context) throws IOException {
        String endpoint = context.getProperty(API_ENDPOINT).evaluateAttributeExpressions().getValue();
        SubscriberStubSettings.Builder subscriberBuilder = (SubscriberStubSettings.Builder)((SubscriberStubSettings.Builder)((SubscriberStubSettings.Builder)SubscriberStubSettings.newBuilder().setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)this.getGoogleCredentials(context)))).setTransportChannelProvider(this.getTransportChannelProvider(context))).setEndpoint(endpoint);
        return GrpcSubscriberStub.create((SubscriberStubSettings)subscriberBuilder.build());
    }
}

