/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling;

import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nonnull;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.io.IOUtils;
import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.AbstractEventProcessor;
import org.axonframework.eventhandling.DirectEventProcessingStrategy;
import org.axonframework.eventhandling.ErrorHandler;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessingException;
import org.axonframework.eventhandling.EventProcessingStrategy;
import org.axonframework.eventhandling.EventProcessorSpanFactory;
import org.axonframework.eventhandling.Segment;
import org.axonframework.lifecycle.Lifecycle;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.unitofwork.BatchingUnitOfWork;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.tracing.SpanFactory;

public class SubscribingEventProcessor
extends AbstractEventProcessor
implements Lifecycle {
    private final SubscribableMessageSource<? extends EventMessage<?>> messageSource;
    private final EventProcessingStrategy processingStrategy;
    private final TransactionManager transactionManager;
    private volatile Registration eventBusRegistration;

    protected SubscribingEventProcessor(Builder builder) {
        super(builder);
        this.messageSource = builder.messageSource;
        this.processingStrategy = builder.processingStrategy;
        this.transactionManager = builder.transactionManager;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override
    public void registerLifecycleHandlers(@Nonnull Lifecycle.LifecycleRegistry handle) {
        handle.onStart(0, this::start);
        handle.onShutdown(0, this::shutDown);
    }

    @Override
    public void start() {
        if (this.eventBusRegistration != null) {
            return;
        }
        this.eventBusRegistration = this.messageSource.subscribe(eventMessages -> this.processingStrategy.handle((List<? extends EventMessage<?>>)eventMessages, this::process));
    }

    @Override
    public boolean isRunning() {
        return this.eventBusRegistration != null;
    }

    @Override
    public boolean isError() {
        return false;
    }

    protected void process(List<? extends EventMessage<?>> eventMessages) {
        try {
            ArrayList batch = new ArrayList();
            for (EventMessage<?> eventMessage : eventMessages) {
                if (this.canHandle(eventMessage, Segment.ROOT_SEGMENT)) {
                    batch.add(eventMessage);
                    continue;
                }
                this.reportIgnored(eventMessage);
            }
            if (batch.isEmpty()) {
                return;
            }
            BatchingUnitOfWork unitOfWork = new BatchingUnitOfWork(batch);
            unitOfWork.attachTransaction(this.transactionManager);
            this.processInUnitOfWork(batch, unitOfWork);
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (Exception e) {
            throw new EventProcessingException("Exception occurred while processing events", e);
        }
    }

    @Override
    public void shutDown() {
        IOUtils.closeQuietly(this.eventBusRegistration);
        this.eventBusRegistration = null;
    }

    public SubscribableMessageSource<? extends EventMessage<?>> getMessageSource() {
        return this.messageSource;
    }

    public static class Builder
    extends AbstractEventProcessor.Builder {
        private SubscribableMessageSource<? extends EventMessage<?>> messageSource;
        private EventProcessingStrategy processingStrategy = DirectEventProcessingStrategy.INSTANCE;
        private TransactionManager transactionManager = NoTransactionManager.INSTANCE;

        public Builder() {
            super.rollbackConfiguration(RollbackConfigurationType.ANY_THROWABLE);
        }

        @Override
        public Builder name(@Nonnull String name) {
            super.name(name);
            return this;
        }

        @Override
        public Builder eventHandlerInvoker(@Nonnull EventHandlerInvoker eventHandlerInvoker) {
            super.eventHandlerInvoker(eventHandlerInvoker);
            return this;
        }

        @Override
        public Builder rollbackConfiguration(@Nonnull RollbackConfiguration rollbackConfiguration) {
            super.rollbackConfiguration(rollbackConfiguration);
            return this;
        }

        @Override
        public Builder errorHandler(@Nonnull ErrorHandler errorHandler) {
            super.errorHandler(errorHandler);
            return this;
        }

        @Override
        public Builder messageMonitor(@Nonnull MessageMonitor<? super EventMessage<?>> messageMonitor) {
            super.messageMonitor(messageMonitor);
            return this;
        }

        @Override
        public Builder spanFactory(@Nonnull EventProcessorSpanFactory spanFactory) {
            super.spanFactory(spanFactory);
            return this;
        }

        @Override
        @Deprecated
        public Builder spanFactory(@Nonnull SpanFactory spanFactory) {
            super.spanFactory(spanFactory);
            return this;
        }

        public Builder messageSource(@Nonnull SubscribableMessageSource<? extends EventMessage<?>> messageSource) {
            BuilderUtils.assertNonNull(messageSource, "SubscribableMessageSource may not be null");
            this.messageSource = messageSource;
            return this;
        }

        public Builder processingStrategy(@Nonnull EventProcessingStrategy processingStrategy) {
            BuilderUtils.assertNonNull(processingStrategy, "EventProcessingStrategy may not be null");
            this.processingStrategy = processingStrategy;
            return this;
        }

        public Builder transactionManager(@Nonnull TransactionManager transactionManager) {
            BuilderUtils.assertNonNull(transactionManager, "TransactionManager may not be null");
            this.transactionManager = transactionManager;
            return this;
        }

        public SubscribingEventProcessor build() {
            return new SubscribingEventProcessor(this);
        }

        @Override
        protected void validate() throws AxonConfigurationException {
            super.validate();
            BuilderUtils.assertNonNull(this.messageSource, "The SubscribableMessageSource is a hard requirement and should be provided");
        }
    }
}

