package org.kie.kogito.addon.quarkus.messaging.common;

import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadataBuilder;
import jakarta.inject.Inject;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider;
import org.kie.kogito.event.CloudEventMarshaller;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventEmitter;
import org.kie.kogito.event.EventMarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/kogito/addon/quarkus/messaging/common/AbstractQuarkusCloudEventEmitter.class */
public abstract class AbstractQuarkusCloudEventEmitter<M> implements EventEmitter {
    private static final Logger logger = LoggerFactory.getLogger(AbstractQuarkusCloudEventEmitter.class);

    @Inject
    MessageDecoratorProvider messageDecorator;
    private CloudEventMarshaller<M> cloudEventMarshaller;
    private EventMarshaller<M> eventMarshaller;

    /* JADX WARN: Multi-variable type inference failed */
    public CompletionStage<Void> emit(DataEvent<?> dataEvent) {
        logger.debug("publishing event {}", dataEvent);
        try {
            Message<M> withNack = this.messageDecorator.decorate(getMessage(dataEvent)).withNack(th -> {
                logger.error("Error publishing event {}", dataEvent, th);
                return CompletableFuture.completedFuture(null);
            });
            emit(withNack);
            return (CompletionStage) withNack.getAck().get();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    protected void setEventDataMarshaller(EventMarshaller<M> eventMarshaller) {
        this.eventMarshaller = eventMarshaller;
    }

    protected void setCloudEventMarshaller(CloudEventMarshaller<M> cloudEventMarshaller) {
        this.cloudEventMarshaller = cloudEventMarshaller;
    }

    private <T> Optional<OutgoingCloudEventMetadata<?>> getMetadata(DataEvent<T> dataEvent) {
        if (dataEvent.getId() == null || dataEvent.getType() == null || dataEvent.getSource() == null || dataEvent.getSpecVersion() == null) {
            return Optional.empty();
        }
        OutgoingCloudEventMetadataBuilder withTimestamp = OutgoingCloudEventMetadata.builder().withId(dataEvent.getId()).withSource(dataEvent.getSource()).withType(dataEvent.getType()).withSubject(dataEvent.getSubject()).withDataContentType(dataEvent.getDataContentType()).withDataSchema(dataEvent.getDataSchema()).withSpecVersion(dataEvent.getSpecVersion().toString()).withTimestamp(dataEvent.getTime().toZonedDateTime());
        for (String str : dataEvent.getExtensionNames()) {
            withTimestamp.withExtension(str, dataEvent.getExtension(str));
        }
        return Optional.of(withTimestamp.build());
    }

    private <T> Message<M> getMessage(DataEvent<T> dataEvent) throws IOException {
        if (this.cloudEventMarshaller != null) {
            return Message.of(this.cloudEventMarshaller.marshall(dataEvent.asCloudEvent(this.cloudEventMarshaller.cloudEventDataFactory())));
        }
        if (this.eventMarshaller == null) {
            throw new IllegalStateException("Not marshaller has been set for emitter " + this);
        }
        Optional<OutgoingCloudEventMetadata<?>> metadata = getMetadata(dataEvent);
        Object marshall = this.eventMarshaller.marshall(dataEvent.getData());
        return metadata.isPresent() ? Message.of(marshall, Metadata.of(new Object[]{metadata.orElseThrow()})) : Message.of(marshall);
    }

    protected abstract void emit(Message<M> message);
}
