/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventsourcing;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.eventhandling.MultiSourceTrackingToken;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.messaging.StreamableMessageSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiStreamableMessageSource
implements StreamableMessageSource<TrackedEventMessage<?>> {
    private static final Logger logger = LoggerFactory.getLogger(MultiStreamableMessageSource.class);
    private final List<IdentifiedStreamableMessageSource> eventStreams;
    private final Comparator<Map.Entry<String, TrackedEventMessage<?>>> trackedEventComparator;

    public static Builder builder() {
        return new Builder();
    }

    protected MultiStreamableMessageSource(Builder builder) {
        this.eventStreams = builder.messageSources();
        this.trackedEventComparator = builder.trackedEventComparator;
    }

    public MultiSourceBlockingStream openStream(TrackingToken trackingToken) {
        if (trackingToken == null) {
            return this.openStream((TrackingToken)this.createTailToken());
        }
        if (trackingToken instanceof MultiSourceTrackingToken) {
            return new MultiSourceBlockingStream(this.eventStreams, (MultiSourceTrackingToken)trackingToken, this.trackedEventComparator);
        }
        throw new IllegalArgumentException("Incompatible token type provided.");
    }

    public MultiSourceTrackingToken createTailToken() {
        HashMap tokenMap = new HashMap();
        this.eventStreams.forEach(streamableMessageSource -> tokenMap.put(streamableMessageSource.sourceId(), streamableMessageSource.createTailToken()));
        return new MultiSourceTrackingToken(tokenMap);
    }

    public MultiSourceTrackingToken createHeadToken() {
        HashMap tokenMap = new HashMap();
        this.eventStreams.forEach(streamableMessageSource -> tokenMap.put(streamableMessageSource.sourceId(), streamableMessageSource.createHeadToken()));
        return new MultiSourceTrackingToken(tokenMap);
    }

    public MultiSourceTrackingToken createTokenAt(Instant dateTime) {
        HashMap tokenMap = new HashMap();
        this.eventStreams.forEach(streamableMessageSource -> tokenMap.put(streamableMessageSource.sourceId(), streamableMessageSource.createTokenAt(dateTime)));
        return new MultiSourceTrackingToken(tokenMap);
    }

    public MultiSourceTrackingToken createTokenSince(Duration duration) {
        HashMap tokenMap = new HashMap();
        this.eventStreams.forEach(streamableMessageSource -> tokenMap.put(streamableMessageSource.sourceId(), streamableMessageSource.createTokenSince(duration)));
        return new MultiSourceTrackingToken(tokenMap);
    }

    private static class MultiSourceBlockingStream
    implements BlockingStream<TrackedEventMessage<?>> {
        private final List<SourceIdAwareBlockingStream> messageStreams;
        private final Map<String, SourceIdAwareBlockingStream> streamBySourceId;
        private final Comparator<? super Map.Entry<String, TrackedEventMessage<?>>> trackedEventComparator;
        private MultiSourceTrackingToken trackingToken;
        private TrackedEventMessage<?> peekedMessage;

        public MultiSourceBlockingStream(Iterable<IdentifiedStreamableMessageSource> messageSources, MultiSourceTrackingToken trackingToken, Comparator<? super Map.Entry<String, TrackedEventMessage<?>>> trackedEventComparator) {
            this.trackedEventComparator = trackedEventComparator;
            this.messageStreams = new ArrayList<SourceIdAwareBlockingStream>();
            this.trackingToken = trackingToken;
            this.streamBySourceId = new HashMap<String, SourceIdAwareBlockingStream>();
            try {
                messageSources.forEach(src -> {
                    SourceIdAwareBlockingStream stream = new SourceIdAwareBlockingStream(src.sourceId(), src.openStream(trackingToken.getTokenForStream(src.sourceId())));
                    this.messageStreams.add(stream);
                    this.streamBySourceId.put(src.sourceId(), stream);
                });
            }
            catch (Exception e) {
                this.messageStreams.forEach(SourceIdAwareBlockingStream::close);
                throw e;
            }
        }

        public boolean hasNextAvailable() {
            return this.messageStreams.stream().anyMatch(BlockingStream::hasNextAvailable);
        }

        public Optional<TrackedEventMessage<?>> peek() {
            if (this.peekedMessage == null) {
                this.peekedMessage = this.doConsumeNext();
            }
            return Optional.ofNullable(this.peekedMessage);
        }

        private TrackedEventMessage<?> doConsumeNext() {
            HashMap candidateMessagesToReturn = new HashMap();
            this.peekForMessages(candidateMessagesToReturn);
            Optional<Map.Entry<String, TrackedEventMessage<?>>> chosenMessage = candidateMessagesToReturn.entrySet().stream().min(this.trackedEventComparator);
            return chosenMessage.map(e -> {
                String streamId = (String)e.getKey();
                TrackedEventMessage message = (TrackedEventMessage)e.getValue();
                try {
                    MultiSourceTrackingToken advancedToken = this.trackingToken.advancedTo(streamId, message.trackingToken());
                    return ((TrackedEventMessage)this.messageSource(streamId).nextAvailable()).withTrackingToken((TrackingToken)advancedToken);
                }
                catch (InterruptedException ex) {
                    logger.warn("Thread Interrupted whilst consuming next message", (Throwable)ex);
                    Thread.currentThread().interrupt();
                    return null;
                }
            }).orElse(null);
        }

        private BlockingStream<TrackedEventMessage<?>> messageSource(String sourceId) {
            return this.streamBySourceId.get(sourceId);
        }

        public boolean hasNextAvailable(int timeout, TimeUnit unit) throws InterruptedException {
            long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
            long longPollTime = unit.toMillis(timeout) / 10L;
            while (System.currentTimeMillis() < deadline) {
                Iterator<SourceIdAwareBlockingStream> it = this.messageStreams.iterator();
                while (it.hasNext()) {
                    SourceIdAwareBlockingStream current = it.next();
                    if (!(it.hasNext() ? current.hasNextAvailable() : current.hasNextAvailable((int)Math.min(longPollTime, deadline - System.currentTimeMillis()), TimeUnit.MILLISECONDS))) continue;
                    return true;
                }
            }
            return false;
        }

        public TrackedEventMessage<?> nextAvailable() throws InterruptedException {
            MultiSourceTrackingToken newTrackingToken;
            if (this.peekedMessage != null) {
                TrackedEventMessage<?> next = this.peekedMessage;
                this.peekedMessage = null;
                this.trackingToken = (MultiSourceTrackingToken)next.trackingToken();
                return next;
            }
            HashMap candidateMessagesToReturn = new HashMap();
            while (candidateMessagesToReturn.size() == 0) {
                this.peekForMessages(candidateMessagesToReturn);
            }
            String streamIdOfMessage = candidateMessagesToReturn.entrySet().stream().min(this.trackedEventComparator).map(Map.Entry::getKey).orElse(null);
            TrackedEventMessage messageToReturn = (TrackedEventMessage)this.messageSource(streamIdOfMessage).nextAvailable();
            this.trackingToken = newTrackingToken = this.trackingToken.advancedTo(streamIdOfMessage, messageToReturn.trackingToken());
            logger.debug("Message consumed from stream: {}", (Object)streamIdOfMessage);
            return messageToReturn.withTrackingToken((TrackingToken)newTrackingToken);
        }

        private void peekForMessages(Map<String, TrackedEventMessage<?>> candidateMessagesToReturn) {
            for (SourceIdAwareBlockingStream singleMessageSource : this.messageStreams) {
                Optional<TrackedEventMessage<?>> currentPeekedMessage = singleMessageSource.peek();
                currentPeekedMessage.ifPresent(trackedEventMessage -> candidateMessagesToReturn.put(singleMessageSource.sourceId, (TrackedEventMessage<?>)trackedEventMessage));
            }
        }

        public void close() {
            this.messageStreams.forEach(SourceIdAwareBlockingStream::close);
        }
    }

    private static class SourceIdAwareBlockingStream
    implements BlockingStream<TrackedEventMessage<?>> {
        private final String sourceId;
        private final BlockingStream<TrackedEventMessage<?>> delegate;

        public SourceIdAwareBlockingStream(String sourceId, BlockingStream<TrackedEventMessage<?>> delegate) {
            this.sourceId = sourceId;
            this.delegate = delegate;
        }

        public boolean hasNextAvailable() {
            return this.delegate.hasNextAvailable();
        }

        public Optional<TrackedEventMessage<?>> peek() {
            return this.delegate.peek();
        }

        public boolean hasNextAvailable(int timeout, TimeUnit unit) throws InterruptedException {
            return this.delegate.hasNextAvailable(timeout, unit);
        }

        public TrackedEventMessage<?> nextAvailable() throws InterruptedException {
            return (TrackedEventMessage)this.delegate.nextAvailable();
        }

        public void close() {
            this.delegate.close();
        }

        public Stream<TrackedEventMessage<?>> asStream() {
            return this.delegate.asStream();
        }
    }

    private static class IdentifiedStreamableMessageSource
    implements StreamableMessageSource<TrackedEventMessage<?>> {
        private final StreamableMessageSource<TrackedEventMessage<?>> delegate;
        private final String sourceId;

        public IdentifiedStreamableMessageSource(String sourceId, StreamableMessageSource<TrackedEventMessage<?>> delegate) {
            this.delegate = delegate;
            this.sourceId = sourceId;
        }

        public BlockingStream<TrackedEventMessage<?>> openStream(TrackingToken trackingToken) {
            return this.delegate.openStream(trackingToken);
        }

        public TrackingToken createTailToken() {
            return this.delegate.createTailToken();
        }

        public TrackingToken createHeadToken() {
            return this.delegate.createHeadToken();
        }

        public TrackingToken createTokenAt(Instant dateTime) {
            return this.delegate.createTokenAt(dateTime);
        }

        public TrackingToken createTokenSince(Duration duration) {
            return this.delegate.createTokenSince(duration);
        }

        public String sourceId() {
            return this.sourceId;
        }
    }

    public static class Builder {
        private Comparator<Map.Entry<String, TrackedEventMessage<?>>> trackedEventComparator = Comparator.comparing(t -> ((TrackedEventMessage)t.getValue()).getTimestamp());
        private Map<String, StreamableMessageSource<TrackedEventMessage<?>>> messageSourceMap = new LinkedHashMap();
        private String longPollingSource = "";

        public Builder addMessageSource(String messageSourceId, StreamableMessageSource<TrackedEventMessage<?>> messageSource) {
            BuilderUtils.assertThat((Object)messageSourceId, sourceName -> !this.messageSourceMap.containsKey(sourceName), (String)"the messageSource name must be unique");
            this.messageSourceMap.put(messageSourceId, messageSource);
            return this;
        }

        public Builder trackedEventComparator(Comparator<Map.Entry<String, TrackedEventMessage<?>>> trackedEventComparator) {
            this.trackedEventComparator = trackedEventComparator;
            return this;
        }

        public Builder longPollingSource(String longPollingSource) {
            BuilderUtils.assertThat((Object)longPollingSource, sourceName -> this.messageSourceMap.containsKey(sourceName), (String)"Current configuration does not contain this message source");
            this.longPollingSource = longPollingSource;
            return this;
        }

        public MultiStreamableMessageSource build() {
            return new MultiStreamableMessageSource(this);
        }

        private List<IdentifiedStreamableMessageSource> messageSources() {
            ArrayList<IdentifiedStreamableMessageSource> sourceList = new ArrayList<IdentifiedStreamableMessageSource>();
            this.messageSourceMap.forEach((sourceId, source) -> {
                if (!this.longPollingSource.equals(sourceId)) {
                    sourceList.add(new IdentifiedStreamableMessageSource((String)sourceId, (StreamableMessageSource<TrackedEventMessage<?>>)source));
                }
            });
            if (this.messageSourceMap.containsKey(this.longPollingSource)) {
                sourceList.add(new IdentifiedStreamableMessageSource(this.longPollingSource, this.messageSourceMap.get(this.longPollingSource)));
            }
            return sourceList;
        }
    }
}

