/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.processors.streaming.pooled;

import jakarta.annotation.Nonnull;
import java.time.Clock;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.annotations.Internal;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.configuration.Configuration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.configuration.EventProcessorConfiguration;
import org.axonframework.eventhandling.processors.errorhandling.ErrorHandler;
import org.axonframework.eventhandling.processors.streaming.pooled.EventSchedulingProcessingContext;
import org.axonframework.eventhandling.processors.streaming.pooled.MaxSegmentProvider;
import org.axonframework.eventhandling.processors.streaming.token.ReplayToken;
import org.axonframework.eventhandling.processors.streaming.token.TrackingToken;
import org.axonframework.eventhandling.processors.streaming.token.store.TokenStore;
import org.axonframework.eventhandling.tracing.EventProcessorSpanFactory;
import org.axonframework.eventstreaming.EventCriteria;
import org.axonframework.eventstreaming.StreamableEventSource;
import org.axonframework.eventstreaming.TrackingTokenSource;
import org.axonframework.messaging.ConfigurationApplicationContext;
import org.axonframework.messaging.EmptyApplicationContext;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.QualifiedName;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.axonframework.messaging.unitofwork.UnitOfWorkFactory;
import org.axonframework.monitoring.MessageMonitor;

public class PooledStreamingEventProcessorConfiguration
extends EventProcessorConfiguration {
    private StreamableEventSource<? extends EventMessage> eventSource;
    private TokenStore tokenStore;
    private ScheduledExecutorService coordinatorExecutor;
    private ScheduledExecutorService workerExecutor;
    private int initialSegmentCount = 16;
    private Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialToken = es -> es.firstToken(null).thenApply(ReplayToken::createReplayToken);
    private long tokenClaimInterval = 5000L;
    private MaxSegmentProvider maxSegmentProvider = MaxSegmentProvider.maxShort();
    private long claimExtensionThreshold = 5000L;
    private int batchSize = 1;
    private Clock clock = GenericEventMessage.clock;
    private boolean coordinatorExtendsClaims = false;
    private Function<Set<QualifiedName>, EventCriteria> eventCriteriaProvider = supportedEvents -> EventCriteria.havingAnyTag().andBeingOneOfTypes((Set<QualifiedName>)supportedEvents);
    private Consumer<? super EventMessage> ignoredMessageHandler = eventMessage -> this.messageMonitor.onMessageIngested(eventMessage).reportIgnored();
    private Supplier<ProcessingContext> schedulingProcessingContextProvider = () -> new EventSchedulingProcessingContext(EmptyApplicationContext.INSTANCE);

    @Internal
    public PooledStreamingEventProcessorConfiguration() {
    }

    @Internal
    public PooledStreamingEventProcessorConfiguration(@Nonnull EventProcessorConfiguration base) {
        super(base);
    }

    @Internal
    public PooledStreamingEventProcessorConfiguration(@Nonnull EventProcessorConfiguration base, @Nonnull Configuration configuration) {
        super(base);
        this.schedulingProcessingContextProvider = () -> new EventSchedulingProcessingContext(new ConfigurationApplicationContext(configuration));
    }

    @Override
    public PooledStreamingEventProcessorConfiguration errorHandler(@Nonnull ErrorHandler errorHandler) {
        super.errorHandler(errorHandler);
        return this;
    }

    @Override
    public PooledStreamingEventProcessorConfiguration messageMonitor(@Nonnull MessageMonitor<? super EventMessage> messageMonitor) {
        super.messageMonitor(messageMonitor);
        return this;
    }

    @Override
    public PooledStreamingEventProcessorConfiguration spanFactory(@Nonnull EventProcessorSpanFactory spanFactory) {
        super.spanFactory(spanFactory);
        return this;
    }

    @Override
    public PooledStreamingEventProcessorConfiguration unitOfWorkFactory(@Nonnull UnitOfWorkFactory unitOfWorkFactory) {
        super.unitOfWorkFactory(unitOfWorkFactory);
        return this;
    }

    public PooledStreamingEventProcessorConfiguration eventSource(@Nonnull StreamableEventSource<? extends EventMessage> eventSource) {
        BuilderUtils.assertNonNull(eventSource, "StreamableEventSource may not be null");
        this.eventSource = eventSource;
        return this;
    }

    @Nonnull
    public PooledStreamingEventProcessorConfiguration withInterceptor(@Nonnull MessageHandlerInterceptor<? super EventMessage> interceptor) {
        this.interceptors.add(interceptor);
        return this;
    }

    public PooledStreamingEventProcessorConfiguration tokenStore(@Nonnull TokenStore tokenStore) {
        BuilderUtils.assertNonNull(tokenStore, "TokenStore may not be null");
        this.tokenStore = tokenStore;
        return this;
    }

    public PooledStreamingEventProcessorConfiguration coordinatorExecutor(@Nonnull ScheduledExecutorService coordinatorExecutor) {
        BuilderUtils.assertNonNull(coordinatorExecutor, "The Coordinator's ScheduledExecutorService may not be null");
        this.coordinatorExecutor = coordinatorExecutor;
        return this;
    }

    public PooledStreamingEventProcessorConfiguration workerExecutor(@Nonnull ScheduledExecutorService workerExecutor) {
        BuilderUtils.assertNonNull(workerExecutor, "The Worker's ScheduledExecutorService may not be null");
        this.workerExecutor = workerExecutor;
        return this;
    }

    public PooledStreamingEventProcessorConfiguration initialSegmentCount(int initialSegmentCount) {
        BuilderUtils.assertStrictPositive(initialSegmentCount, "The initial segment count should be a higher valuer than zero");
        this.initialSegmentCount = initialSegmentCount;
        return this;
    }

    public PooledStreamingEventProcessorConfiguration initialToken(@Nonnull Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialToken) {
        BuilderUtils.assertNonNull(initialToken, "The initial token builder Function may not be null");
        this.initialToken = initialToken;
        return this;
    }

    public PooledStreamingEventProcessorConfiguration tokenClaimInterval(long tokenClaimInterval) {
        BuilderUtils.assertStrictPositive(tokenClaimInterval, "Token claim interval should be a higher valuer than zero");
        this.tokenClaimInterval = tokenClaimInterval;
        return this;
    }

    public PooledStreamingEventProcessorConfiguration maxClaimedSegments(int maxClaimedSegments) {
        this.maxSegmentProvider = n -> maxClaimedSegments;
        return this;
    }

    public PooledStreamingEventProcessorConfiguration maxSegmentProvider(@Nonnull MaxSegmentProvider maxSegmentProvider) {
        BuilderUtils.assertNonNull(maxSegmentProvider, "The max segment provider may not be null. Provide a lambda of type (processorName: String) -> maxSegmentsToClaim");
        this.maxSegmentProvider = maxSegmentProvider;
        return this;
    }

    public PooledStreamingEventProcessorConfiguration claimExtensionThreshold(long claimExtensionThreshold) {
        BuilderUtils.assertStrictPositive(claimExtensionThreshold, "The claim extension threshold should be a higher valuer than zero");
        this.claimExtensionThreshold = claimExtensionThreshold;
        return this;
    }

    public PooledStreamingEventProcessorConfiguration batchSize(int batchSize) {
        BuilderUtils.assertStrictPositive(batchSize, "The batch size should be a higher valuer than zero");
        this.batchSize = batchSize;
        return this;
    }

    public PooledStreamingEventProcessorConfiguration clock(@Nonnull Clock clock) {
        BuilderUtils.assertNonNull(clock, "Clock may not be null");
        this.clock = clock;
        return this;
    }

    public PooledStreamingEventProcessorConfiguration enableCoordinatorClaimExtension() {
        this.coordinatorExtendsClaims = true;
        return this;
    }

    public PooledStreamingEventProcessorConfiguration ignoredMessageHandler(Consumer<? super EventMessage> ignoredMessageHandler) {
        this.ignoredMessageHandler = ignoredMessageHandler;
        return this;
    }

    public PooledStreamingEventProcessorConfiguration eventCriteria(@Nonnull Function<Set<QualifiedName>, EventCriteria> eventCriteriaProvider) {
        BuilderUtils.assertNonNull(eventCriteriaProvider, "EventCriteria builder function may not be null");
        this.eventCriteriaProvider = eventCriteriaProvider;
        return this;
    }

    public PooledStreamingEventProcessorConfiguration schedulingProcessingContextProvider(@Nonnull Supplier<ProcessingContext> schedulingProcessingContextProvider) {
        Objects.requireNonNull(schedulingProcessingContextProvider, "schedulingProcessingContextProvider may not be null.");
        this.schedulingProcessingContextProvider = schedulingProcessingContextProvider;
        return this;
    }

    @Override
    protected void validate() throws AxonConfigurationException {
        super.validate();
        BuilderUtils.assertNonNull(this.eventSource, "The StreamableEventSource is a hard requirement and should be provided");
        BuilderUtils.assertNonNull(this.tokenStore, "The TokenStore is a hard requirement and should be provided");
        BuilderUtils.assertNonNull(this.unitOfWorkFactory, "The UnitOfWorkFactory is a hard requirement and should be provided");
        BuilderUtils.assertNonNull(this.coordinatorExecutor, "The Coordinator ScheduledExecutorService is a hard requirement and should be provided");
        BuilderUtils.assertNonNull(this.workerExecutor, "The Worker ScheduledExecutorService is a hard requirement and should be provided");
        BuilderUtils.assertNonNull(this.schedulingProcessingContextProvider, "The Scheduling Processing Context Provider is a hard requirement and should be provided");
    }

    @Override
    public boolean streaming() {
        return true;
    }

    public StreamableEventSource<? extends EventMessage> eventSource() {
        return this.eventSource;
    }

    public TokenStore tokenStore() {
        return this.tokenStore;
    }

    public ScheduledExecutorService coordinatorExecutor() {
        return this.coordinatorExecutor;
    }

    public ScheduledExecutorService workerExecutor() {
        return this.workerExecutor;
    }

    public int initialSegmentCount() {
        return this.initialSegmentCount;
    }

    public Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialToken() {
        return this.initialToken;
    }

    public long tokenClaimInterval() {
        return this.tokenClaimInterval;
    }

    public MaxSegmentProvider maxSegmentProvider() {
        return this.maxSegmentProvider;
    }

    public long claimExtensionThreshold() {
        return this.claimExtensionThreshold;
    }

    public int batchSize() {
        return this.batchSize;
    }

    public Clock clock() {
        return this.clock;
    }

    public boolean coordinatorExtendsClaims() {
        return this.coordinatorExtendsClaims;
    }

    public Function<Set<QualifiedName>, EventCriteria> eventCriteriaProvider() {
        return this.eventCriteriaProvider;
    }

    public Consumer<? super EventMessage> ignoredMessageHandler() {
        return this.ignoredMessageHandler;
    }

    public Supplier<ProcessingContext> schedulingProcessingContextProvider() {
        return this.schedulingProcessingContextProvider;
    }

    @Override
    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        super.describeTo(descriptor);
        descriptor.describeProperty("eventSource", this.eventSource);
        descriptor.describeProperty("tokenStore", this.tokenStore);
        descriptor.describeProperty("coordinatorExecutor", this.coordinatorExecutor);
        descriptor.describeProperty("workerExecutor", this.workerExecutor);
        descriptor.describeProperty("initialSegmentCount", this.initialSegmentCount);
        descriptor.describeProperty("initialToken", this.initialToken);
        descriptor.describeProperty("tokenClaimInterval", this.tokenClaimInterval);
        descriptor.describeProperty("maxSegmentProvider", this.maxSegmentProvider);
        descriptor.describeProperty("claimExtensionThreshold", this.claimExtensionThreshold);
        descriptor.describeProperty("batchSize", this.batchSize);
        descriptor.describeProperty("clock", this.clock);
        descriptor.describeProperty("coordinatorExtendsClaims", this.coordinatorExtendsClaims);
        descriptor.describeProperty("eventCriteriaProvider", this.eventCriteriaProvider);
    }
}

