/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.service;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.noop.NoopTracerFactory;
import io.opentracing.tag.Tags;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;
import java.util.Objects;
import java.util.Optional;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.EventBusMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class EventBusService
extends AbstractVerticle {
    protected final Logger log = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    protected Tracer tracer = NoopTracerFactory.create();
    private MessageConsumer<JsonObject> requestConsumer;

    @Autowired(required=false)
    public final void setTracer(Tracer opentracingTracer) {
        this.log.info("using OpenTracing Tracer implementation [{}]", (Object)opentracingTracer.getClass().getName());
        this.tracer = Objects.requireNonNull(opentracingTracer);
    }

    public final void start(Promise<Void> startPromise) {
        Promise localStart = Promise.promise();
        localStart.future().map(ok -> {
            this.log.info("service started successfully");
            return ok;
        }).onComplete(startPromise);
        this.registerConsumer();
        this.doStart((Promise<Void>)localStart);
    }

    protected void doStart(Promise<Void> startPromise) {
        startPromise.complete();
    }

    protected abstract String getEventBusAddress();

    public final void stop(Promise<Void> stopPromise) {
        Promise localStop = Promise.promise();
        localStop.future().map(ok -> {
            this.log.info("service stopped successfully");
            return ok;
        }).onComplete(stopPromise);
        if (this.requestConsumer != null) {
            this.requestConsumer.unregister();
            this.log.info("unregistered request consumer from event bus");
        }
        this.doStop((Promise<Void>)localStop);
    }

    protected void doStop(Promise<Void> stopPromise) {
        stopPromise.complete();
    }

    private void registerConsumer() {
        this.requestConsumer = this.vertx.eventBus().consumer(this.getEventBusAddress());
        this.requestConsumer.handler(this::processRequestMessage);
        this.log.info("listening on event bus [address: {}] for requests", (Object)this.getEventBusAddress());
    }

    private void processRequestMessage(Message<JsonObject> msg) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("received request message: {}", (Object)((JsonObject)msg.body()).encodePrettily());
        }
        EventBusMessage request = EventBusMessage.fromJson((JsonObject)((JsonObject)msg.body()));
        SpanContext spanContext = TracingHelper.extractSpanContext((Tracer)this.tracer, (MultiMap)msg.headers());
        request.setSpanContext(spanContext);
        this.processRequest(request).recover(t -> {
            this.log.debug("cannot process request [operation: {}]: {}", new Object[]{request.getOperation(), t.getMessage(), t});
            int status = ServiceInvocationException.extractStatusCode((Throwable)t);
            return Future.succeededFuture((Object)request.getResponse(status));
        }).map(response -> {
            if (response.getReplyToAddress() == null) {
                this.log.debug("sending response as direct reply to request [operation: {}]", (Object)request.getOperation());
                msg.reply((Object)response.toJson());
            } else if (response.hasResponseProperties()) {
                this.log.debug("sending response [operation: {}, reply-to: {}]", (Object)request.getOperation(), (Object)request.getReplyToAddress());
                this.vertx.eventBus().send(request.getReplyToAddress(), (Object)response.toJson());
            } else {
                this.log.warn("discarding response lacking correlation ID or operation");
            }
            return null;
        });
    }

    protected abstract Future<EventBusMessage> processRequest(EventBusMessage var1);

    protected static final <T> T getTypesafeValueForField(Class<T> clazz, JsonObject payload, String field) {
        Objects.requireNonNull(clazz);
        Objects.requireNonNull(payload);
        Objects.requireNonNull(field);
        Object result = payload.getValue(field);
        if (clazz.isInstance(result)) {
            return clazz.cast(result);
        }
        return null;
    }

    protected static final <T> T removeTypesafeValueForField(Class<T> clazz, JsonObject payload, String field) {
        Objects.requireNonNull(clazz);
        Objects.requireNonNull(payload);
        Objects.requireNonNull(field);
        Object result = payload.remove(field);
        if (clazz.isInstance(result)) {
            return clazz.cast(result);
        }
        return null;
    }

    protected final JsonObject getRequestPayload(JsonObject payload) {
        return Optional.ofNullable(payload).map(pl -> {
            Object obj = pl.getValue("enabled");
            if (obj instanceof Boolean) {
                return pl;
            }
            this.log.trace("adding 'enabled=true' property to request payload");
            return pl.copy().put("enabled", Boolean.TRUE);
        }).orElseGet(() -> new JsonObject().put("enabled", Boolean.TRUE));
    }

    protected Future<EventBusMessage> finishSpanOnFutureCompletion(Span span, Future<EventBusMessage> resultFuture) {
        return resultFuture.compose(eventBusMessage -> {
            Tags.HTTP_STATUS.set(span, eventBusMessage.getStatus());
            if (eventBusMessage.hasErrorStatus()) {
                Tags.ERROR.set(span, Boolean.valueOf(true));
            }
            span.finish();
            return Future.succeededFuture((Object)eventBusMessage);
        }).recover(t -> {
            Tags.HTTP_STATUS.set(span, Integer.valueOf(ServiceInvocationException.extractStatusCode((Throwable)t)));
            TracingHelper.logError((Span)span, (Throwable)t);
            span.finish();
            return Future.failedFuture((Throwable)t);
        });
    }
}

