/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connector.googlepubsub.internal.source;

import com.google.api.core.ApiService;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.mulesoft.connector.googlepubsub.internal.config.PubSubConfiguration;
import com.mulesoft.connector.googlepubsub.internal.connection.PubSubConnection;
import com.mulesoft.connector.googlepubsub.internal.connection.provider.FlowControlParameters;
import com.mulesoft.connector.googlepubsub.internal.error.exception.GooglePubSubRuntimeException;
import com.mulesoft.connector.googlepubsub.internal.metadata.MessageListenerMetadataResolver;
import com.mulesoft.connector.googlepubsub.internal.operation.params.SubscriptionIdentifier;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.execution.OnTerminate;
import org.mule.runtime.extension.api.annotation.metadata.MetadataKeyId;
import org.mule.runtime.extension.api.annotation.metadata.MetadataScope;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Placement;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.annotation.source.BackPressure;
import org.mule.runtime.extension.api.annotation.source.ClusterSupport;
import org.mule.runtime.extension.api.annotation.source.OnBackPressure;
import org.mule.runtime.extension.api.annotation.source.SourceClusterSupport;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.BackPressureContext;
import org.mule.runtime.extension.api.runtime.source.BackPressureMode;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.runtime.extension.api.runtime.source.SourceCompletionCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Summary(value="Listen for messages on selected subscription")
@DisplayName(value="On message listener")
@Alias(value="message-listener")
@MediaType(value="*/*")
@ClusterSupport(value=SourceClusterSupport.DEFAULT_ALL_NODES)
@BackPressure(defaultMode=BackPressureMode.WAIT, supportedModes={BackPressureMode.WAIT, BackPressureMode.DROP, BackPressureMode.FAIL})
@MetadataScope(outputResolver=MessageListenerMetadataResolver.class, attributesResolver=MessageListenerMetadataResolver.class)
public class MessageListenerSource
extends Source<InputStream, Map<String, String>> {
    private static final Logger logger = LoggerFactory.getLogger(MessageListenerSource.class);
    public static final String CONSUMER = "consumer";
    @Config
    private PubSubConfiguration configuration;
    @Connection
    private ConnectionProvider<PubSubConnection> connectionProvider;
    private PubSubConnection pubSubConnection;
    private Subscriber subscriber;
    @MetadataKeyId
    @ParameterGroup(name="identifier")
    private SubscriptionIdentifier identifier;
    @Optional(defaultValue="5")
    @DisplayName(value="Consumer count")
    @Parameter
    @Placement(tab="Advanced")
    @Summary(value="Provides specified amount of executor service for processing messages.")
    private int consumerCount;
    @ParameterGroup(name="Flow control parameters")
    private FlowControlParameters flowControlParameters;

    public void onStart(final SourceCallback<InputStream, Map<String, String>> sourceCallback) throws MuleException {
        this.pubSubConnection = (PubSubConnection)this.connectionProvider.connect();
        this.subscriber = this.initSubscriber(sourceCallback);
        this.subscriber.addListener(new ApiService.Listener(){

            public void failed(ApiService.State from, Throwable failure) {
                if (from == ApiService.State.STARTING) {
                    throw new GooglePubSubRuntimeException(failure);
                }
                sourceCallback.onConnectionException(new ConnectionException(failure));
            }
        }, MoreExecutors.directExecutor());
        this.subscriber.startAsync().awaitRunning();
        logger.debug("Listening for messages on: {}", (Object)this.identifier.getSubscriptionName());
    }

    public void onStop() {
        logger.debug("Stopping async subscriber for subscription: {}", (Object)this.identifier.getSubscriptionName());
        if (this.subscriber != null) {
            this.subscriber.stopAsync();
        }
    }

    @OnError
    public void onError(SourceCallbackContext sourceCallbackContext) {
        logger.debug("Error processing message. Message will be NACK-ed");
        AckReplyConsumer ackReplyConsumer = (AckReplyConsumer)sourceCallbackContext.getVariable(CONSUMER).orElseThrow(() -> new IllegalStateException("Consumer for message NACK not found!"));
        try {
            ackReplyConsumer.nack();
        }
        catch (Exception e) {
            logger.warn("An error {} occurred during the automatic not-acknowledgement of the current message. \n Continuing...", (Object)e.getMessage());
        }
    }

    @OnSuccess
    public void onSuccess(SourceCallbackContext sourceCallbackContext) {
        logger.debug("Message processed successfully.");
        AckReplyConsumer ackReplyConsumer = (AckReplyConsumer)sourceCallbackContext.getVariable(CONSUMER).orElseThrow(() -> new IllegalStateException("Consumer for message ACK not found!"));
        try {
            ackReplyConsumer.ack();
        }
        catch (Exception e) {
            logger.warn("An error {} occurred during the automatic acknowledgement of the current message. \n Continuing...", (Object)e.getMessage());
        }
    }

    @OnBackPressure
    public void onBackPressure(BackPressureContext ctx, SourceCompletionCallback completionCallback) {
        try {
            logger.debug("Flow is unable to accept new messages at this time. Message will be NACK-ed");
            AckReplyConsumer ackReplyConsumer = (AckReplyConsumer)ctx.getSourceCallbackContext().getVariable(CONSUMER).orElseThrow(() -> new IllegalStateException("Consumer for message NACK not found!"));
            ackReplyConsumer.nack();
        }
        catch (Exception e) {
            logger.warn("An error {} occurred during the automatic not-acknowledgement of the current message. \n Continuing...", (Object)e.getMessage());
        }
    }

    @OnTerminate
    public void onTerminate(SourceCallbackContext sourceCallbackContext) {
        logger.trace("Message was processed by messageListenerSource flow");
    }

    private Subscriber initSubscriber(SourceCallback<InputStream, Map<String, String>> sourceCallback) {
        MessageReceiver receiver = (pubsubMessage, consumer) -> {
            HashMap<String, String> attributes = new HashMap<String, String>();
            attributes.put("messageId", pubsubMessage.getMessageId());
            attributes.put("publishTime", pubsubMessage.getPublishTime().toString());
            attributes.put("orderingKey", pubsubMessage.getOrderingKey());
            attributes.putAll(pubsubMessage.getAttributesMap());
            SourceCallbackContext ctx = sourceCallback.createContext();
            ctx.addVariable(CONSUMER, (Object)consumer);
            sourceCallback.handle(Result.builder().output((Object)pubsubMessage.getData().newInput()).attributes(attributes).build(), ctx);
        };
        return Subscriber.newBuilder((String)String.format("projects/%s/subscriptions/%s", this.identifier.getProjectId(), this.identifier.getSubscriptionName()), (MessageReceiver)receiver).setFlowControlSettings(FlowControlSettings.newBuilder().setLimitExceededBehavior(FlowController.LimitExceededBehavior.valueOf((String)this.flowControlParameters.getLimitExceededBehavior().getValue())).setMaxOutstandingElementCount(Long.valueOf(this.flowControlParameters.getMaxOutstandingElementCount())).setMaxOutstandingRequestBytes(Long.valueOf(this.flowControlParameters.getMaxOutstandingRequestSizeInBytes())).build()).setCredentialsProvider(this.pubSubConnection.getCredentialsProvider()).setExecutorProvider((ExecutorProvider)InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(this.consumerCount).build()).build();
    }
}

