/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.StreamConsumerBuilder;
import com.rabbitmq.stream.impl.StreamEnvironment;
import com.rabbitmq.stream.impl.Utils;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class StreamConsumer
implements Consumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumer.class);
    private final Runnable closingCallback;
    private final Runnable closingTrackingCallback;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final String name;
    private final String stream;
    private final StreamEnvironment environment;
    private volatile Client trackingClient;
    private volatile Status status;
    private final LongConsumer trackingCallback;

    StreamConsumer(String stream, OffsetSpecification offsetSpecification, MessageHandler messageHandler, String name, StreamEnvironment environment, StreamConsumerBuilder.TrackingConfiguration trackingConfiguration) {
        try {
            MessageHandler messageHandlerWithOrWithoutTracking;
            this.name = name;
            this.stream = stream;
            this.environment = environment;
            if (trackingConfiguration.enabled()) {
                StreamEnvironment.TrackingConsumerRegistration trackingConsumerRegistration = environment.registerTrackingConsumer(this, trackingConfiguration);
                this.closingTrackingCallback = trackingConsumerRegistration.closingCallback();
                java.util.function.Consumer<MessageHandler.Context> postMessageProcessingCallback = trackingConsumerRegistration.postMessageProcessingCallback();
                messageHandlerWithOrWithoutTracking = postMessageProcessingCallback == null ? messageHandler : (context, message) -> {
                    messageHandler.handle(context, message);
                    postMessageProcessingCallback.accept(context);
                };
                this.trackingCallback = trackingConsumerRegistration.trackingCallback();
            } else {
                this.closingTrackingCallback = () -> {};
                this.trackingCallback = Utils.NO_OP_LONG_CONSUMER;
                messageHandlerWithOrWithoutTracking = messageHandler;
            }
            this.closingCallback = environment.registerConsumer(this, stream, offsetSpecification, this.name, messageHandlerWithOrWithoutTracking);
            this.status = Status.RUNNING;
        }
        catch (RuntimeException e) {
            this.closed.set(true);
            throw e;
        }
    }

    @Override
    public void store(long offset) {
        this.trackingCallback.accept(offset);
        if (this.canTrack()) {
            try {
                this.trackingClient.storeOffset(this.name, this.stream, offset);
            }
            catch (Exception e) {
                LOGGER.debug("Error while trying to store offset: {}", (Object)e.getMessage());
            }
        }
    }

    private boolean canTrack() {
        return this.status == Status.RUNNING;
    }

    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.environment.removeConsumer(this);
            this.closeFromEnvironment();
            LOGGER.debug("Closed consumer successfully");
        }
    }

    void closeFromEnvironment() {
        LOGGER.debug("Calling consumer closing callback");
        this.closingCallback.run();
        LOGGER.debug("Calling tracking consumer closing callback (may be no-op)");
        this.closingTrackingCallback.run();
        this.closed.set(true);
        this.status = Status.CLOSED;
        LOGGER.debug("Closed consumer successfully");
    }

    void closeAfterStreamDeletion() {
        if (this.closed.compareAndSet(false, true)) {
            this.environment.removeConsumer(this);
            this.status = Status.CLOSED;
        }
    }

    boolean isOpen() {
        return !this.closed.get();
    }

    synchronized void setClient(Client client) {
        this.trackingClient = client;
    }

    synchronized void unavailable() {
        this.status = Status.NOT_AVAILABLE;
        this.trackingClient = null;
    }

    void running() {
        this.status = Status.RUNNING;
    }

    long lastStoredOffset() {
        if (this.canTrack()) {
            try {
                return this.trackingClient.queryOffset(this.name, this.stream);
            }
            catch (Exception e) {
                return 0L;
            }
        }
        return 0L;
    }

    String stream() {
        return this.stream;
    }

    static enum Status {
        RUNNING,
        NOT_AVAILABLE,
        CLOSED;

    }
}

