/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.event;

import io.axoniq.axonserver.connector.AxonServerConnection;
import io.axoniq.axonserver.connector.admin.AdminChannel;
import io.axoniq.axonserver.connector.control.ControlChannel;
import io.axoniq.axonserver.connector.control.ProcessorInstructionHandler;
import io.axoniq.axonserver.grpc.control.EventProcessorInfo;
import jakarta.annotation.Nonnull;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.event.EventProcessorInfoUtils;
import org.axonframework.common.FutureUtils;
import org.axonframework.common.annotations.Internal;
import org.axonframework.configuration.Configuration;
import org.axonframework.eventhandling.processors.EventProcessor;
import org.axonframework.eventhandling.processors.streaming.StreamingEventProcessor;
import org.axonframework.eventhandling.processors.streaming.token.store.TokenStore;
import org.axonframework.eventhandling.processors.subscribing.SubscribingEventProcessor;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWorkFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class EventProcessorControlService {
    private static final Logger logger = LoggerFactory.getLogger(EventProcessorControlService.class);
    private final AxonServerConnectionManager axonServerConnectionManager;
    private final Configuration configuration;
    private final String context;
    private final Map<String, AxonServerConfiguration.Eventhandling.ProcessorSettings> processorConfig;

    public EventProcessorControlService(@Nonnull Configuration configuration, @Nonnull AxonServerConnectionManager connectionManager, @Nonnull String context, @Nonnull Map<String, AxonServerConfiguration.Eventhandling.ProcessorSettings> processorConfig) {
        this.axonServerConnectionManager = Objects.requireNonNull(connectionManager, "The Axon Server Connection Manager must not be null.");
        this.configuration = Objects.requireNonNull(configuration, "The Configuration must not be null.");
        this.context = Objects.requireNonNull(context, "The Context must not be null.");
        this.processorConfig = Objects.requireNonNull(processorConfig, "The Processor Configuration must not be null.");
    }

    public void start() {
        Map eventProcessors = this.configuration.getComponents(EventProcessor.class);
        AxonServerConnection connection = this.axonServerConnectionManager.getConnection(this.context);
        this.registerInstructionHandlers(connection, eventProcessors);
        this.setLoadBalancingStrategies(connection, eventProcessors.keySet());
    }

    private void registerInstructionHandlers(@Nonnull AxonServerConnection connection, @Nonnull Map<String, EventProcessor> eventProcessors) {
        ControlChannel controlChannel = connection.controlChannel();
        eventProcessors.forEach((name, processor) -> controlChannel.registerEventProcessor(name, this.infoSupplier((EventProcessor)processor), (ProcessorInstructionHandler)new AxonProcessorInstructionHandler((EventProcessor)processor, (String)name)));
    }

    @Nonnull
    private Supplier<EventProcessorInfo> infoSupplier(@Nonnull EventProcessor processor) {
        if (processor instanceof StreamingEventProcessor) {
            StreamingEventProcessor streamingProcessor = (StreamingEventProcessor)processor;
            return () -> EventProcessorInfoUtils.describeStreaming(streamingProcessor);
        }
        if (processor instanceof SubscribingEventProcessor) {
            SubscribingEventProcessor subscribingProcessor = (SubscribingEventProcessor)processor;
            return () -> EventProcessorInfoUtils.describeSubscribing(subscribingProcessor);
        }
        return () -> EventProcessorInfoUtils.describeUnknown(processor);
    }

    private void setLoadBalancingStrategies(AxonServerConnection connection, Set<String> processorNames) {
        AdminChannel adminChannel = connection.adminChannel();
        Map<String, String> strategiesPerProcessor = this.processorConfig.entrySet().stream().filter(entry -> {
            if (!processorNames.contains(entry.getKey())) {
                logger.info("Event Processor [{}] is not a registered. Please check the name or register the Event Processor.", entry.getKey());
                return false;
            }
            return true;
        }).collect(Collectors.toMap(Map.Entry::getKey, entry -> ((AxonServerConfiguration.Eventhandling.ProcessorSettings)entry.getValue()).getLoadBalancingStrategy()));
        strategiesPerProcessor.forEach((processorName, strategy) -> {
            Optional<String> optionalIdentifier = this.processorTokenStoreOrGlobal((String)processorName);
            if (optionalIdentifier.isEmpty()) {
                logger.warn("Cannot find token store identifier for processor [{}]. Load balancing cannot be configured without this identifier.", processorName);
                return;
            }
            String tokenStoreIdentifier = optionalIdentifier.get();
            adminChannel.loadBalanceEventProcessor(processorName, tokenStoreIdentifier, strategy).whenComplete((r, e) -> {
                if (e == null) {
                    logger.debug("Successfully requested to load balance processor [{}] with strategy [{}].", processorName, strategy);
                    return;
                }
                logger.warn("Requesting to load balance processor [{}] with strategy [{}] failed.", new Object[]{processorName, strategy, e});
            });
            if (this.processorConfig.get(processorName).isAutomaticBalancing()) {
                adminChannel.setAutoLoadBalanceStrategy(processorName, tokenStoreIdentifier, strategy).whenComplete((r, e) -> {
                    if (e == null) {
                        logger.debug("Successfully requested to automatically balance processor [{}] with strategy [{}].", processorName, strategy);
                        return;
                    }
                    logger.warn("Requesting to automatically balance processor [{}] with strategy [{}] failed.", new Object[]{processorName, strategy, e});
                });
            }
        });
    }

    private Optional<String> processorTokenStoreOrGlobal(String processorName) {
        Optional moduleConfiguration = this.configuration.getModuleConfiguration(processorName);
        Optional tokenStore = moduleConfiguration.flatMap(m -> m.getOptionalComponent(TokenStore.class, "TokenStore[" + processorName + "]")).or(() -> this.configuration.getOptionalComponent(TokenStore.class));
        if (tokenStore.isEmpty()) {
            logger.warn("Cannot find TokenStore for processor [{}]. Please ensure the processor module is properly configured.", (Object)processorName);
            return Optional.empty();
        }
        Optional unitOfWorkFactory = moduleConfiguration.flatMap(m -> m.getOptionalComponent(UnitOfWorkFactory.class, "UnitOfWorkFactory[" + processorName + "]")).or(() -> this.configuration.getOptionalComponent(UnitOfWorkFactory.class));
        if (unitOfWorkFactory.isEmpty()) {
            logger.warn("Cannot find UnitOfWorkFactory for processor [{}]. Please ensure the processor module is properly configured.", (Object)processorName);
            return Optional.empty();
        }
        UnitOfWork unitOfWork = ((UnitOfWorkFactory)unitOfWorkFactory.get()).create();
        return Optional.of((String)FutureUtils.joinAndUnwrap((CompletableFuture)unitOfWork.executeWithResult(ctx -> ((TokenStore)tokenStore.get()).retrieveStorageIdentifier(ctx))));
    }

    protected record AxonProcessorInstructionHandler(EventProcessor processor, String name) implements ProcessorInstructionHandler
    {
        public CompletableFuture<Boolean> releaseSegment(int segmentId) {
            try {
                if (!(this.processor instanceof StreamingEventProcessor)) {
                    logger.info("Release segment requested for processor [{}] which is not a Streaming Event Processor", (Object)this.name);
                    return CompletableFuture.completedFuture(false);
                }
                ((StreamingEventProcessor)this.processor).releaseSegment(segmentId);
            }
            catch (Exception e) {
                return CompletableFuture.failedFuture(e);
            }
            return CompletableFuture.completedFuture(true);
        }

        public CompletableFuture<Boolean> splitSegment(int segmentId) {
            try {
                if (!(this.processor instanceof StreamingEventProcessor)) {
                    logger.info("Split segment requested for processor [{}] which is not a Streaming Event Processor", (Object)this.name);
                    return CompletableFuture.completedFuture(false);
                }
                return ((StreamingEventProcessor)this.processor).splitSegment(segmentId).thenApply(result -> {
                    if (Boolean.TRUE.equals(result)) {
                        logger.info("Successfully split segment [{}] of processor [{}]", (Object)segmentId, (Object)this.name);
                    } else {
                        logger.warn("Was not able to split segment [{}] for processor [{}]", (Object)segmentId, (Object)this.name);
                    }
                    return result;
                });
            }
            catch (Exception e) {
                return CompletableFuture.failedFuture(e);
            }
        }

        public CompletableFuture<Boolean> mergeSegment(int segmentId) {
            try {
                if (!(this.processor instanceof StreamingEventProcessor)) {
                    logger.warn("Merge segment request received for processor [{}] which is not a Streaming Event Processor", (Object)this.name);
                    return CompletableFuture.completedFuture(false);
                }
                return ((StreamingEventProcessor)this.processor).mergeSegment(segmentId).thenApply(result -> {
                    if (Boolean.TRUE.equals(result)) {
                        logger.info("Successfully merged segment [{}] of processor [{}]", (Object)segmentId, (Object)this.name);
                    } else {
                        logger.warn("Was not able to merge segment [{}] for processor [{}]", (Object)segmentId, (Object)this.name);
                    }
                    return result;
                });
            }
            catch (Exception e) {
                return CompletableFuture.failedFuture(e);
            }
        }

        public CompletableFuture<Void> pauseProcessor() {
            return this.processor.shutdown();
        }

        public CompletableFuture<Void> startProcessor() {
            return this.processor.start();
        }
    }
}

