/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.pooled;

import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.configuration.LifecycleRegistry;
import org.axonframework.eventhandling.AbstractEventProcessor;
import org.axonframework.eventhandling.ErrorHandler;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessorSpanFactory;
import org.axonframework.eventhandling.EventTrackerStatus;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.ReplayToken;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.StreamingEventProcessor;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackerStatus;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.pooled.Coordinator;
import org.axonframework.eventhandling.pooled.MaxSegmentProvider;
import org.axonframework.eventhandling.pooled.WorkPackage;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.lifecycle.Lifecycle;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.monitoring.MessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PooledStreamingEventProcessor
extends AbstractEventProcessor
implements StreamingEventProcessor,
Lifecycle {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final String name;
    private final StreamableMessageSource<TrackedEventMessage<?>> messageSource;
    private final TokenStore tokenStore;
    private final TransactionManager transactionManager;
    private final ScheduledExecutorService workerExecutor;
    private final Coordinator coordinator;
    private final Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken;
    private final long tokenClaimInterval;
    private final MaxSegmentProvider maxSegmentProvider;
    private final long claimExtensionThreshold;
    private final int batchSize;
    private final Clock clock;
    private final AtomicReference<String> tokenStoreIdentifier = new AtomicReference();
    private final Map<Integer, TrackerStatus> processingStatus = new ConcurrentHashMap<Integer, TrackerStatus>();

    protected PooledStreamingEventProcessor(Builder builder) {
        super(builder);
        this.name = builder.name();
        this.messageSource = builder.messageSource;
        this.tokenStore = builder.tokenStore;
        this.transactionManager = builder.transactionManager;
        this.workerExecutor = builder.workerExecutorBuilder.apply(this.name);
        this.initialToken = builder.initialToken;
        this.tokenClaimInterval = builder.tokenClaimInterval;
        this.maxSegmentProvider = builder.maxSegmentProvider;
        this.claimExtensionThreshold = builder.claimExtensionThreshold;
        this.batchSize = builder.batchSize;
        this.clock = builder.clock;
        this.coordinator = Coordinator.builder().name(this.name).messageSource(this.messageSource).tokenStore(this.tokenStore).transactionManager(this.transactionManager).executorService(builder.coordinatorExecutorBuilder.apply(this.name)).workPackageFactory(this::spawnWorker).eventFilter(event -> this.canHandleType(event.getPayloadType())).onMessageIgnored(x$0 -> this.reportIgnored((EventMessage<?>)x$0)).processingStatusUpdater(this::statusUpdater).tokenClaimInterval(this.tokenClaimInterval).claimExtensionThreshold(this.claimExtensionThreshold).clock(this.clock).maxSegmentProvider(this.maxSegmentProvider).initialSegmentCount(builder.initialSegmentCount).initialToken(this.initialToken).coordinatorClaimExtension(builder.coordinatorExtendsClaims).segmentReleasedAction(segment -> this.eventHandlerInvoker().segmentReleased((Segment)segment)).build();
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override
    public void registerLifecycleHandlers(@Nonnull LifecycleRegistry handle) {
        handle.onStart(0x3FFFFFFF, this::start);
        handle.onShutdown(0x3FFFFFFF, this::shutdownAsync);
    }

    @Override
    public void start() {
        logger.info("Starting PooledStreamingEventProcessor [{}].", (Object)this.name);
        this.coordinator.start();
    }

    @Override
    public void shutDown() {
        this.shutdownAsync().join();
    }

    @Override
    public CompletableFuture<Void> shutdownAsync() {
        logger.info("Stopping PooledStreamingEventProcessor [{}]", (Object)this.name);
        return this.coordinator.stop();
    }

    @Override
    public boolean isRunning() {
        return this.coordinator.isRunning();
    }

    @Override
    public boolean isError() {
        return this.coordinator.isError();
    }

    @Override
    public String getTokenStoreIdentifier() {
        return this.tokenStoreIdentifier.updateAndGet(i -> i != null ? i : this.calculateIdentifier());
    }

    private String calculateIdentifier() {
        return this.transactionManager.fetchInTransaction(() -> this.tokenStore.retrieveStorageIdentifier().orElse("--unknown--"));
    }

    @Override
    public void releaseSegment(int segmentId) {
        this.releaseSegment(segmentId, this.tokenClaimInterval * 2L, TimeUnit.MILLISECONDS);
    }

    @Override
    public void releaseSegment(int segmentId, long releaseDuration, TimeUnit unit) {
        this.coordinator.releaseUntil(segmentId, GenericEventMessage.clock.instant().plusMillis(unit.toMillis(releaseDuration)));
    }

    @Override
    public CompletableFuture<Boolean> claimSegment(int segmentId) {
        return this.coordinator.claimSegment(segmentId);
    }

    @Override
    public CompletableFuture<Boolean> splitSegment(int segmentId) {
        if (!this.tokenStore.requiresExplicitSegmentInitialization()) {
            CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
            result.completeExceptionally(new UnsupportedOperationException("TokenStore must require explicit initialization to safely split tokens."));
            return result;
        }
        return this.coordinator.splitSegment(segmentId);
    }

    @Override
    public CompletableFuture<Boolean> mergeSegment(int segmentId) {
        if (!this.tokenStore.requiresExplicitSegmentInitialization()) {
            CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
            result.completeExceptionally(new UnsupportedOperationException("TokenStore must require explicit initialization to safely merge tokens."));
            return result;
        }
        return this.coordinator.mergeSegment(segmentId);
    }

    @Override
    public boolean supportsReset() {
        return this.eventHandlerInvoker().supportsReset();
    }

    @Override
    public void resetTokens() {
        this.resetTokens(this.initialToken);
    }

    @Override
    public <R> void resetTokens(R resetContext) {
        this.resetTokens(this.initialToken, resetContext);
    }

    @Override
    public void resetTokens(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialTrackingTokenSupplier) {
        this.resetTokens(initialTrackingTokenSupplier.apply(this.messageSource));
    }

    @Override
    public <R> void resetTokens(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialTrackingTokenSupplier, R resetContext) {
        this.resetTokens(initialTrackingTokenSupplier.apply(this.messageSource), resetContext);
    }

    @Override
    public void resetTokens(@Nonnull TrackingToken startPosition) {
        this.resetTokens(startPosition, null);
    }

    @Override
    public <R> void resetTokens(@Nonnull TrackingToken startPosition, R resetContext) {
        Assert.state(this.supportsReset(), () -> "The handlers assigned to this Processor do not support a reset.");
        Assert.state(!this.isRunning(), () -> "The Processor must be shut down before triggering a reset.");
        this.transactionManager.executeInTransaction(() -> {
            int[] segments = this.tokenStore.fetchSegments(this.getName());
            logger.debug("Processor [{}] will try to reset tokens for segments [{}].", (Object)this.name, (Object)segments);
            TrackingToken[] tokens = (TrackingToken[])Arrays.stream(segments).mapToObj(segment -> this.tokenStore.fetchToken(this.getName(), segment)).toArray(TrackingToken[]::new);
            this.eventHandlerInvoker().performReset(resetContext, null);
            IntStream.range(0, tokens.length).forEach(i -> this.tokenStore.storeToken(ReplayToken.createReplayToken(tokens[i], startPosition, resetContext), this.getName(), segments[i]));
            logger.info("Processor [{}] successfully reset tokens for segments [{}].", (Object)this.name, (Object)segments);
        });
    }

    @Override
    public int maxCapacity() {
        return this.maxSegmentProvider.getMaxSegments(this.name);
    }

    @Override
    public Map<Integer, EventTrackerStatus> processingStatus() {
        return Collections.unmodifiableMap(this.processingStatus);
    }

    private WorkPackage spawnWorker(Segment segment, TrackingToken initialToken) {
        return WorkPackage.builder().name(this.name).tokenStore(this.tokenStore).transactionManager(this.transactionManager).executorService(this.workerExecutor).eventFilter((x$0, x$1) -> this.canHandle(x$0, x$1)).batchProcessor((x$0, x$1, x$2) -> this.processInUnitOfWork(x$0, x$1, x$2)).segment(segment).initialToken(initialToken).batchSize(this.batchSize).claimExtensionThreshold(this.claimExtensionThreshold).segmentStatusUpdater(this.singleStatusUpdater(segment.getSegmentId(), new TrackerStatus(segment, initialToken))).clock(this.clock).build();
    }

    private Consumer<UnaryOperator<TrackerStatus>> singleStatusUpdater(int segmentId, TrackerStatus initialStatus) {
        return statusUpdater -> this.processingStatus.compute(segmentId, (s, status) -> (TrackerStatus)statusUpdater.apply(status == null ? initialStatus : status));
    }

    private void statusUpdater(int segmentId, UnaryOperator<TrackerStatus> segmentUpdater) {
        this.processingStatus.computeIfPresent(segmentId, (s, ts) -> (TrackerStatus)segmentUpdater.apply((TrackerStatus)ts));
    }

    public static class Builder
    extends AbstractEventProcessor.Builder {
        private StreamableMessageSource<TrackedEventMessage<?>> messageSource;
        private TokenStore tokenStore;
        private TransactionManager transactionManager;
        private Function<String, ScheduledExecutorService> coordinatorExecutorBuilder;
        private Function<String, ScheduledExecutorService> workerExecutorBuilder;
        private int initialSegmentCount = 16;
        private Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken = ms -> ReplayToken.createReplayToken(ms.createHeadToken());
        private long tokenClaimInterval = 5000L;
        private MaxSegmentProvider maxSegmentProvider = MaxSegmentProvider.maxShort();
        private long claimExtensionThreshold = 5000L;
        private int batchSize = 1;
        private Clock clock = GenericEventMessage.clock;
        private boolean coordinatorExtendsClaims = false;

        protected Builder() {
            this.rollbackConfiguration(RollbackConfigurationType.ANY_THROWABLE);
        }

        @Override
        public Builder name(@Nonnull String name) {
            super.name(name);
            return this;
        }

        @Override
        public Builder eventHandlerInvoker(@Nonnull EventHandlerInvoker eventHandlerInvoker) {
            super.eventHandlerInvoker(eventHandlerInvoker);
            return this;
        }

        @Override
        public Builder rollbackConfiguration(@Nonnull RollbackConfiguration rollbackConfiguration) {
            super.rollbackConfiguration(rollbackConfiguration);
            return this;
        }

        @Override
        public Builder errorHandler(@Nonnull ErrorHandler errorHandler) {
            super.errorHandler(errorHandler);
            return this;
        }

        @Override
        public Builder messageMonitor(@Nonnull MessageMonitor<? super EventMessage<?>> messageMonitor) {
            super.messageMonitor(messageMonitor);
            return this;
        }

        @Override
        public Builder spanFactory(@Nonnull EventProcessorSpanFactory spanFactory) {
            super.spanFactory(spanFactory);
            return this;
        }

        public Builder messageSource(@Nonnull StreamableMessageSource<TrackedEventMessage<?>> messageSource) {
            BuilderUtils.assertNonNull(messageSource, "StreamableMessageSource may not be null");
            this.messageSource = messageSource;
            return this;
        }

        public Builder tokenStore(@Nonnull TokenStore tokenStore) {
            BuilderUtils.assertNonNull(tokenStore, "TokenStore may not be null");
            this.tokenStore = tokenStore;
            return this;
        }

        public Builder transactionManager(@Nonnull TransactionManager transactionManager) {
            BuilderUtils.assertNonNull(transactionManager, "TransactionManager may not be null");
            this.transactionManager = transactionManager;
            return this;
        }

        public Builder coordinatorExecutor(@Nonnull ScheduledExecutorService coordinatorExecutor) {
            BuilderUtils.assertNonNull(coordinatorExecutor, "The Coordinator's ScheduledExecutorService may not be null");
            this.coordinatorExecutorBuilder = ignored -> coordinatorExecutor;
            return this;
        }

        public Builder coordinatorExecutor(@Nonnull Function<String, ScheduledExecutorService> coordinatorExecutorBuilder) {
            BuilderUtils.assertNonNull(coordinatorExecutorBuilder, "The Coordinator's ScheduledExecutorService builder may not be null");
            this.coordinatorExecutorBuilder = coordinatorExecutorBuilder;
            return this;
        }

        public Builder workerExecutor(@Nonnull ScheduledExecutorService workerExecutor) {
            BuilderUtils.assertNonNull(workerExecutor, "The Worker's ScheduledExecutorService may not be null");
            this.workerExecutorBuilder = ignored -> workerExecutor;
            return this;
        }

        public Builder workerExecutor(@Nonnull Function<String, ScheduledExecutorService> workerExecutorBuilder) {
            BuilderUtils.assertNonNull(workerExecutorBuilder, "The Worker's ScheduledExecutorService builder may not be null");
            this.workerExecutorBuilder = workerExecutorBuilder;
            return this;
        }

        public Builder initialSegmentCount(int initialSegmentCount) {
            BuilderUtils.assertStrictPositive(initialSegmentCount, "The initial segment count should be a higher valuer than zero");
            this.initialSegmentCount = initialSegmentCount;
            return this;
        }

        public Builder initialToken(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken) {
            BuilderUtils.assertNonNull(initialToken, "The initial token builder Function may not be null");
            this.initialToken = initialToken;
            return this;
        }

        public Builder tokenClaimInterval(long tokenClaimInterval) {
            BuilderUtils.assertStrictPositive(tokenClaimInterval, "Token claim interval should be a higher valuer than zero");
            this.tokenClaimInterval = tokenClaimInterval;
            return this;
        }

        public Builder maxClaimedSegments(int maxClaimedSegments) {
            this.maxSegmentProvider = n -> maxClaimedSegments;
            return this;
        }

        public Builder maxSegmentProvider(MaxSegmentProvider maxSegmentProvider) {
            BuilderUtils.assertNonNull(maxSegmentProvider, "The max segment provider may not be null. Provide a lambda of type (processorName: String) -> maxSegmentsToClaim");
            BuilderUtils.assertStrictPositive(maxSegmentProvider.getMaxSegments(this.name), "Max claimed segments should be a higher valuer than zero");
            this.maxSegmentProvider = maxSegmentProvider;
            return this;
        }

        public Builder claimExtensionThreshold(long claimExtensionThreshold) {
            BuilderUtils.assertStrictPositive(claimExtensionThreshold, "The claim extension threshold should be a higher valuer than zero");
            this.claimExtensionThreshold = claimExtensionThreshold;
            return this;
        }

        public Builder batchSize(int batchSize) {
            BuilderUtils.assertStrictPositive(batchSize, "The batch size should be a higher valuer than zero");
            this.batchSize = batchSize;
            return this;
        }

        public Builder clock(@Nonnull Clock clock) {
            BuilderUtils.assertNonNull(clock, "Clock may not be null");
            this.clock = clock;
            return this;
        }

        public Builder enableCoordinatorClaimExtension() {
            this.coordinatorExtendsClaims = true;
            return this;
        }

        public PooledStreamingEventProcessor build() {
            return new PooledStreamingEventProcessor(this);
        }

        @Override
        protected void validate() throws AxonConfigurationException {
            super.validate();
            BuilderUtils.assertNonNull(this.messageSource, "The StreamableMessageSource is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.tokenStore, "The TokenStore is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.transactionManager, "The TransactionManager is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.coordinatorExecutorBuilder, "The Coordinator ScheduledExecutorService is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.workerExecutorBuilder, "The Worker ScheduledExecutorService is a hard requirement and should be provided");
        }

        public String name() {
            return this.name;
        }
    }
}

