/*
 * Decompiled with CFR 0.152.
 */
package io.gravitee.plugin.endpoint.mock;

import io.gravitee.common.http.HttpHeader;
import io.gravitee.gateway.api.http.HttpHeaders;
import io.gravitee.gateway.reactive.api.ConnectorMode;
import io.gravitee.gateway.reactive.api.connector.endpoint.async.HttpEndpointAsyncConnector;
import io.gravitee.gateway.reactive.api.context.http.HttpExecutionContext;
import io.gravitee.gateway.reactive.api.message.DefaultMessage;
import io.gravitee.gateway.reactive.api.message.Message;
import io.gravitee.gateway.reactive.api.qos.Qos;
import io.gravitee.gateway.reactive.api.qos.QosCapability;
import io.gravitee.gateway.reactive.api.tracing.message.TracingMessageAttribute;
import io.gravitee.gateway.reactive.api.tracing.message.TracingMessageOperationType;
import io.gravitee.plugin.endpoint.mock.configuration.MockEndpointConnectorConfiguration;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MockEndpointConnector
extends HttpEndpointAsyncConnector {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(MockEndpointConnector.class);
    static final Set<ConnectorMode> SUPPORTED_MODES = Set.of(ConnectorMode.PUBLISH, ConnectorMode.SUBSCRIBE);
    static final Set<Qos> SUPPORTED_QOS = Set.of(Qos.NONE, Qos.AUTO, Qos.AT_LEAST_ONCE, Qos.AT_MOST_ONCE);
    private static final Set<QosCapability> SUPPORTED_QOS_CAPABILITIES = Set.of(QosCapability.AUTO_ACK, QosCapability.MANUAL_ACK, QosCapability.RECOVER);
    private static final String ENDPOINT_ID = "mock";
    protected final MockEndpointConnectorConfiguration configuration;

    public String id() {
        return ENDPOINT_ID;
    }

    public Set<ConnectorMode> supportedModes() {
        return SUPPORTED_MODES;
    }

    public Set<Qos> supportedQos() {
        return SUPPORTED_QOS;
    }

    public Set<QosCapability> supportedQosCapabilities() {
        return SUPPORTED_QOS_CAPABILITIES;
    }

    public Completable subscribe(HttpExecutionContext ctx) {
        return Completable.fromRunnable(() -> {
            Integer messagesLimitCount = (Integer)ctx.getInternalAttribute("messages.limit.count");
            Long messagesLimitDurationMs = (Long)ctx.getInternalAttribute("messages.limit.durationMs");
            String messagesResumeLastId = (String)ctx.getInternalAttribute("messages.recovery.lastId");
            Integer maximumPublishedMessages = this.configuration.getMessageCount();
            ctx.response().messages(this.generateMessageFlow(messagesLimitCount, maximumPublishedMessages, messagesLimitDurationMs, messagesResumeLastId));
        });
    }

    public Completable publish(HttpExecutionContext ctx) {
        return Completable.defer(() -> ctx.request().onMessage(message -> {
            log.info("Received message: {}", message.content() != null ? message.content().toString() : null);
            return Maybe.just((Object)message);
        }));
    }

    private Flowable<Message> generateMessageFlow(Integer messagesLimitCount, Integer maximumPublishedMessages, Long messagesLimitDurationMs, String lastId) {
        long stateInitValue = this.getStateInitValue(lastId);
        Flowable messageFlow = Flowable.generate(() -> stateInitValue, (state, emitter) -> {
            if (!(maximumPublishedMessages != null && state >= (long)maximumPublishedMessages.intValue() || messagesLimitCount != null && state - stateInitValue >= (long)messagesLimitCount.intValue())) {
                DefaultMessage.DefaultMessageBuilder messageBuilder = DefaultMessage.builder().content(this.configuration.getMessageContent()).id(Long.toString(state));
                if (this.configuration.getHeaders() != null) {
                    HttpHeaders headers = HttpHeaders.create();
                    this.configuration.getHeaders().forEach(h -> headers.add((CharSequence)h.getName(), (CharSequence)h.getValue()));
                    messageBuilder.headers(headers);
                }
                if (this.configuration.getMetadata() != null) {
                    messageBuilder.metadata(this.configuration.getMetadata().stream().collect(Collectors.toMap(HttpHeader::getName, HttpHeader::getValue)));
                }
                messageBuilder.tracingAttributes(Map.of(TracingMessageAttribute.MESSAGING_OPERATION_NAME.key(), "receive", TracingMessageAttribute.MESSAGING_OPERATION_TYPE.key(), TracingMessageOperationType.RECEIVE.value(), TracingMessageAttribute.MESSAGING_SYSTEM.key(), ENDPOINT_ID, TracingMessageAttribute.MESSAGING_MESSAGE_BODY_SIZE.key(), messageBuilder.content() != null ? String.valueOf(messageBuilder.content().length()) : "0"));
                emitter.onNext((Object)messageBuilder.build());
            } else {
                emitter.onComplete();
            }
            return state + 1L;
        }).delay(this.configuration.getMessageInterval().longValue(), TimeUnit.MILLISECONDS).rebatchRequests(1);
        if (messagesLimitDurationMs != null) {
            messageFlow = messageFlow.take(messagesLimitDurationMs.longValue(), TimeUnit.MILLISECONDS);
        }
        return messageFlow;
    }

    private long getStateInitValue(String lastId) {
        long stateInitValue = 0L;
        if (lastId != null) {
            try {
                stateInitValue = Long.parseLong(lastId) + 1L;
            }
            catch (NumberFormatException nfe) {
                log.warn("Unable to parse lastId: {}. Setting to 0", (Object)lastId);
            }
        }
        return stateInitValue;
    }

    @Generated
    public MockEndpointConnector(MockEndpointConnectorConfiguration configuration) {
        this.configuration = configuration;
    }
}

