/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs;

import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.MessageFluxWrapper;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;

class EventHubPartitionAsyncConsumer
implements AutoCloseable {
    private static final ClientLogger LOGGER = new ClientLogger(EventHubPartitionAsyncConsumer.class);
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicReference<LastEnqueuedEventProperties> lastEnqueuedEventProperties = new AtomicReference();
    private final MessageFluxWrapper messageFlux;
    private final MessageSerializer messageSerializer;
    private final String fullyQualifiedNamespace;
    private final String eventHubName;
    private final String consumerGroup;
    private final String partitionId;
    private final boolean trackLastEnqueuedEventProperties;
    private final Flux<PartitionEvent> partitionEvents;
    private final EventPosition initialPosition;
    private volatile Long currentOffset;

    EventHubPartitionAsyncConsumer(MessageFluxWrapper messageFlux, MessageSerializer messageSerializer, String fullyQualifiedNamespace, String eventHubName, String consumerGroup, String partitionId, AtomicReference<Supplier<EventPosition>> currentEventPosition, boolean trackLastEnqueuedEventProperties) {
        this.initialPosition = Objects.requireNonNull(currentEventPosition.get().get(), "'currentEventPosition.get().get()' cannot be null.");
        this.messageFlux = messageFlux;
        this.messageSerializer = messageSerializer;
        this.fullyQualifiedNamespace = fullyQualifiedNamespace;
        this.eventHubName = eventHubName;
        this.consumerGroup = consumerGroup;
        this.partitionId = partitionId;
        this.trackLastEnqueuedEventProperties = trackLastEnqueuedEventProperties;
        if (trackLastEnqueuedEventProperties) {
            this.lastEnqueuedEventProperties.set(new LastEnqueuedEventProperties(null, null, null, null));
        }
        currentEventPosition.set(() -> {
            Long offset = this.currentOffset;
            return offset == null ? this.initialPosition : EventPosition.fromOffset(offset);
        });
        this.partitionEvents = messageFlux.flux().map(this::onMessageReceived).doOnNext(event -> {
            Long offset = event.getData().getOffset();
            if (offset != null) {
                this.currentOffset = offset;
            } else {
                LOGGER.atWarning().addKeyValue("partitionId", event.getPartitionContext().getPartitionId()).addKeyValue("consumerGroup", event.getPartitionContext().getConsumerGroup()).addKeyValue("data", () -> event.getData().getBodyAsString()).log("Offset for received event should not be null.");
            }
        });
    }

    @Override
    public void close() {
        if (!this.isDisposed.getAndSet(true)) {
            if (!this.messageFlux.isTerminated()) {
                this.messageFlux.cancel();
            }
            LOGGER.atInfo().addKeyValue("partitionId", this.partitionId).log("Closed consumer.");
        }
    }

    Flux<PartitionEvent> receive() {
        return this.partitionEvents;
    }

    private PartitionEvent onMessageReceived(Message message) {
        LastEnqueuedEventProperties enqueuedEventProperties;
        EventData event = (EventData)((Object)this.messageSerializer.deserialize(message, EventData.class));
        if (this.trackLastEnqueuedEventProperties && (enqueuedEventProperties = (LastEnqueuedEventProperties)this.messageSerializer.deserialize(message, LastEnqueuedEventProperties.class)) != null) {
            LastEnqueuedEventProperties updated = new LastEnqueuedEventProperties(enqueuedEventProperties.getSequenceNumber(), enqueuedEventProperties.getOffset(), enqueuedEventProperties.getEnqueuedTime(), enqueuedEventProperties.getRetrievalTime());
            this.lastEnqueuedEventProperties.set(updated);
        }
        PartitionContext partitionContext = new PartitionContext(this.fullyQualifiedNamespace, this.eventHubName, this.consumerGroup, this.partitionId);
        return new PartitionEvent(partitionContext, event, this.lastEnqueuedEventProperties.get());
    }
}

