/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.event.axon;

import com.google.common.base.Strings;
import io.axoniq.axonserver.connector.event.EventChannel;
import io.axoniq.axonserver.connector.event.PersistentStream;
import io.axoniq.axonserver.connector.event.PersistentStreamCallbacks;
import io.axoniq.axonserver.connector.event.PersistentStreamProperties;
import io.axoniq.axonserver.connector.event.PersistentStreamSegment;
import io.axoniq.axonserver.connector.impl.StreamClosedException;
import io.axoniq.axonserver.grpc.streams.PersistentStreamEvent;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.configuration.Configuration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.ReplayToken;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentStreamConnection {
    private static final int MAX_RETRY_INTERVAL_SECONDS = 60;
    private static final int MIN_RETRY_INTERVAL_SECONDS = 1;
    private final Logger logger = LoggerFactory.getLogger(PersistentStreamConnection.class);
    private final String streamId;
    private final Configuration configuration;
    private final PersistentStreamProperties persistentStreamProperties;
    private final AtomicReference<PersistentStream> persistentStreamHolder = new AtomicReference();
    private static final Consumer<List<? extends EventMessage>> NO_OP_CONSUMER = events -> {};
    private final AtomicReference<Consumer<List<? extends EventMessage>>> consumer = new AtomicReference<Consumer<List<? extends EventMessage>>>(NO_OP_CONSUMER);
    private final ScheduledExecutorService scheduler;
    private final int batchSize;
    private final Map<Integer, SegmentConnection> segments = new ConcurrentHashMap<Integer, SegmentConnection>();
    private final AtomicInteger retrySeconds = new AtomicInteger(1);
    private final String defaultContext;

    public PersistentStreamConnection(String streamId, Configuration configuration, PersistentStreamProperties persistentStreamProperties, ScheduledExecutorService scheduler, int batchSize) {
        this.streamId = streamId;
        this.configuration = configuration;
        this.persistentStreamProperties = persistentStreamProperties;
        this.scheduler = scheduler;
        this.batchSize = batchSize;
        this.defaultContext = null;
    }

    public PersistentStreamConnection(String streamId, Configuration configuration, PersistentStreamProperties persistentStreamProperties, ScheduledExecutorService scheduler, int batchSize, String defaultContext) {
        this.streamId = streamId;
        this.configuration = configuration;
        this.persistentStreamProperties = persistentStreamProperties;
        this.scheduler = scheduler;
        this.batchSize = batchSize;
        this.defaultContext = defaultContext;
    }

    public void open(Consumer<List<? extends EventMessage>> consumer) {
        if (!this.consumer.compareAndSet(NO_OP_CONSUMER, consumer)) {
            throw new IllegalStateException(String.format("%s: Persistent Stream has already been opened.", this.streamId));
        }
        this.start();
    }

    private void start() {
        AxonServerConnectionManager axonServerConnectionManager = (AxonServerConnectionManager)this.configuration.getComponent(AxonServerConnectionManager.class);
        AxonServerConfiguration axonServerConfiguration = (AxonServerConfiguration)this.configuration.getComponent(AxonServerConfiguration.class);
        String context = Strings.isNullOrEmpty((String)this.defaultContext) ? axonServerConfiguration.getContext() : this.defaultContext;
        PersistentStreamCallbacks callbacks = new PersistentStreamCallbacks(this::segmentOpened, this::segmentClosed, this::messageAvailable, this::streamClosed);
        EventChannel eventChannel = axonServerConnectionManager.getConnection(context).eventChannel();
        PersistentStream persistentStream = eventChannel.openPersistentStream(this.streamId, axonServerConfiguration.getEventFlowControl().getPermits().intValue(), axonServerConfiguration.getEventFlowControl().getNrOfNewPermits().intValue(), callbacks, this.persistentStreamProperties);
        this.persistentStreamHolder.set(persistentStream);
    }

    private void segmentOpened(PersistentStreamSegment persistentStreamSegment) {
        this.logger.info("Segment opened: {}", (Object)persistentStreamSegment);
        this.retrySeconds.set(1);
        this.segments.put(persistentStreamSegment.segment(), new SegmentConnection(persistentStreamSegment));
    }

    private void segmentClosed(PersistentStreamSegment persistentStreamSegment) {
        this.segments.remove(persistentStreamSegment.segment());
        this.logger.info("Segment closed: {}", (Object)persistentStreamSegment);
    }

    private void messageAvailable(PersistentStreamSegment persistentStreamSegment) {
        SegmentConnection segmentConnection = this.segments.get(persistentStreamSegment.segment());
        if (segmentConnection != null) {
            segmentConnection.messageAvailable();
        }
    }

    private void streamClosed(Throwable throwable) {
        this.persistentStreamHolder.set(null);
        if (throwable != null) {
            this.logger.info("{}: Rescheduling persistent stream", (Object)this.streamId, (Object)throwable);
            this.scheduler.schedule(this::start, (long)this.retrySeconds.getAndUpdate(current -> Math.min(60, current * 2)), TimeUnit.SECONDS);
        }
    }

    public void close() {
        PersistentStream persistentStream = this.persistentStreamHolder.getAndSet(null);
        if (persistentStream != null) {
            persistentStream.close();
            this.consumer.set(NO_OP_CONSUMER);
        }
    }

    private class SegmentConnection {
        private final AtomicBoolean processGate = new AtomicBoolean();
        private final AtomicBoolean doneConfirmed = new AtomicBoolean();
        private final PersistentStreamSegment persistentStreamSegment;
        private final AtomicReference<SegmentState> currentState = new AtomicReference<ProcessingState>(new ProcessingState());

        public SegmentConnection(PersistentStreamSegment persistentStreamSegment) {
            this.persistentStreamSegment = persistentStreamSegment;
        }

        private void processBatch(List<PersistentStreamEvent> batch) {
            List<TrackedEventMessage> eventMessages = this.upcastAndDeserialize(batch);
            if (!this.persistentStreamSegment.isClosed()) {
                long token = batch.get(batch.size() - 1).getEvent().getToken();
                PersistentStreamConnection.this.consumer.get().accept(eventMessages);
                if (PersistentStreamConnection.this.logger.isTraceEnabled()) {
                    PersistentStreamConnection.this.logger.trace("{}/{} processed {} entries", new Object[]{PersistentStreamConnection.this.streamId, this.persistentStreamSegment.segment(), eventMessages.size()});
                }
                this.persistentStreamSegment.acknowledge(token);
            }
        }

        public void messageAvailable() {
            if (!this.processGate.get()) {
                PersistentStreamConnection.this.scheduler.submit(this::readMessagesFromSegment);
            }
        }

        private void readMessagesFromSegment() {
            this.currentState.get().readMessages();
        }

        private List<TrackedEventMessage> upcastAndDeserialize(List<PersistentStreamEvent> batch) {
            return null;
        }

        private TrackingToken createToken(PersistentStreamEvent event) {
            if (!event.getReplay()) {
                return new GlobalSequenceTrackingToken(event.getEvent().getToken());
            }
            return ReplayToken.createReplayToken((TrackingToken)new GlobalSequenceTrackingToken(event.getEvent().getToken() + 1L), (TrackingToken)new GlobalSequenceTrackingToken(event.getEvent().getToken()));
        }

        private class ProcessingState
        implements SegmentState {
            private ProcessingState() {
            }

            @Override
            public void readMessages() {
                if (!SegmentConnection.this.processGate.compareAndSet(false, true)) {
                    return;
                }
                if (PersistentStreamConnection.this.logger.isTraceEnabled()) {
                    PersistentStreamConnection.this.logger.trace("{}[{}] readMessagesFromSegment - closed: {}", new Object[]{PersistentStreamConnection.this.streamId, SegmentConnection.this.persistentStreamSegment.segment(), SegmentConnection.this.persistentStreamSegment.isClosed()});
                }
                try {
                    List<PersistentStreamEvent> batch;
                    if (!SegmentConnection.this.persistentStreamSegment.isClosed() && !(batch = this.readBatch(SegmentConnection.this.persistentStreamSegment)).isEmpty()) {
                        try {
                            SegmentConnection.this.processBatch(batch);
                        }
                        catch (Exception ex) {
                            PersistentStreamConnection.this.logger.warn("{}: Exception while processing events for segment {}, retrying after {} second", new Object[]{PersistentStreamConnection.this.streamId, SegmentConnection.this.persistentStreamSegment.segment(), 1, ex});
                            SegmentConnection.this.currentState.set(new RetryState(batch));
                        }
                    }
                    this.acknowledgeDoneWhenClosed(SegmentConnection.this.persistentStreamSegment);
                }
                catch (StreamClosedException e) {
                    PersistentStreamConnection.this.logger.debug("{}: Stream closed for segment {}", (Object)PersistentStreamConnection.this.streamId, (Object)SegmentConnection.this.persistentStreamSegment.segment());
                }
                catch (Exception e) {
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    SegmentConnection.this.persistentStreamSegment.error(e.getMessage());
                    PersistentStreamConnection.this.logger.warn("{}: Exception while processing events for segment {}", new Object[]{PersistentStreamConnection.this.streamId, SegmentConnection.this.persistentStreamSegment.segment(), e});
                }
                finally {
                    SegmentConnection.this.processGate.set(false);
                    if (!SegmentConnection.this.persistentStreamSegment.isClosed() && SegmentConnection.this.persistentStreamSegment.peek() != null) {
                        PersistentStreamConnection.this.scheduler.submit(SegmentConnection.this::readMessagesFromSegment);
                    }
                }
            }

            private List<PersistentStreamEvent> readBatch(PersistentStreamSegment persistentStreamSegment) throws InterruptedException {
                LinkedList<PersistentStreamEvent> batch = new LinkedList<PersistentStreamEvent>();
                PersistentStreamEvent event = (PersistentStreamEvent)persistentStreamSegment.nextIfAvailable();
                if (event == null) {
                    return batch;
                }
                batch.add(event);
                while (batch.size() < PersistentStreamConnection.this.batchSize && !persistentStreamSegment.isClosed() && (event = (PersistentStreamEvent)persistentStreamSegment.nextIfAvailable(1L, TimeUnit.MILLISECONDS)) != null) {
                    batch.add(event);
                }
                return batch;
            }

            private void acknowledgeDoneWhenClosed(PersistentStreamSegment persistentStreamSegment) {
                if (persistentStreamSegment.isClosed() && SegmentConnection.this.doneConfirmed.compareAndSet(false, true)) {
                    persistentStreamSegment.acknowledge(-45L);
                }
            }
        }

        private class RetryState
        implements SegmentState {
            private final List<PersistentStreamEvent> batch;
            private final AtomicInteger retryInterval = new AtomicInteger(1);

            public RetryState(List<PersistentStreamEvent> batch) {
                this.batch = batch;
                PersistentStreamConnection.this.scheduler.schedule(this::retry, (long)this.retryInterval.get(), TimeUnit.SECONDS);
            }

            private void retry() {
                try {
                    SegmentConnection.this.processBatch(this.batch);
                    SegmentConnection.this.currentState.set(new ProcessingState());
                    PersistentStreamConnection.this.scheduler.submit(SegmentConnection.this::readMessagesFromSegment);
                }
                catch (Exception ex) {
                    int interval = this.retryInterval.updateAndGet(old -> Math.min(old * 2, 60));
                    PersistentStreamConnection.this.logger.warn("{}: Exception while retrying events for segment {}, retrying after {} seconds", new Object[]{PersistentStreamConnection.this.streamId, SegmentConnection.this.persistentStreamSegment.segment(), interval, ex});
                    PersistentStreamConnection.this.scheduler.schedule(this::retry, (long)interval, TimeUnit.SECONDS);
                }
            }

            @Override
            public void readMessages() {
            }
        }
    }

    private static interface SegmentState {
        public void readMessages();
    }
}

