/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.mule.runtime.module.batch.internal.engine.buffer;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.mulesoft.mule.runtime.module.batch.api.record.Record;
import com.mulesoft.mule.runtime.module.batch.engine.BatchEngine;
import com.mulesoft.mule.runtime.module.batch.engine.BatchJobInstanceAdapter;
import com.mulesoft.mule.runtime.module.batch.engine.BatchStepAdapter;
import com.mulesoft.mule.runtime.module.batch.engine.queue.BatchQueueDelegate;
import com.mulesoft.mule.runtime.module.batch.engine.queue.BatchQueueManager;
import com.mulesoft.mule.runtime.module.batch.engine.transaction.BatchTransactionContext;
import com.mulesoft.mule.runtime.module.batch.internal.engine.BatchProcessingTemplate;
import com.mulesoft.mule.runtime.module.batch.internal.engine.DefaultBatchEngine;
import com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.AggregatorRecordBuffer;
import com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.BufferUtils;
import com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.RecordBuffer;
import com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.StreamingAggregatorInputQueueBuffer;
import com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.StreamingAggregatorIteratorWrapper;
import com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.StreamingAggregatorOutputQueueBuffer;
import com.mulesoft.mule.runtime.module.batch.internal.engine.transaction.BatchTransactionContextProvider;
import com.mulesoft.mule.runtime.module.batch.internal.engine.transaction.ManagedBatchTransactionContextProvider;
import com.mulesoft.mule.runtime.module.batch.util.BatchUtils;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.notification.NotificationDispatcher;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.tracer.api.EventTracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingAggregatorBuffer
extends AggregatorRecordBuffer {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamingAggregatorBuffer.class);
    private final StreamingAggregatorInputQueueBuffer inputQueueBuffer;
    private final RecordBuffer outputQueueBuffer;
    private final BatchQueueManager queueManager;
    private final Set<BatchJobInstanceAdapter> flushingInstances = Collections.newSetFromMap(new ConcurrentHashMap());
    private final BatchTransactionContextProvider inputContexts;
    private final BatchTransactionContextProvider outputContexts;

    public StreamingAggregatorBuffer(BatchEngine batchEngine, BatchStepAdapter step, Processor chain, NotificationDispatcher notificationDispatcher, MuleContext muleContext, EventTracer<CoreEvent> coreEventTracer) {
        super(String.format("batch-step-%s-streaming-aggregator-buffer", step.getName()), 0, batchEngine, step, chain, step.getLocation(), notificationDispatcher, muleContext, coreEventTracer);
        this.queueManager = batchEngine.getBatchQueueManager();
        this.inputQueueBuffer = new StreamingAggregatorInputQueueBuffer(batchEngine, step);
        this.outputQueueBuffer = new StreamingAggregatorOutputQueueBuffer(batchEngine, step);
        this.inputContexts = this.newProvider();
        this.outputContexts = this.newProvider();
    }

    @Override
    public int add(BatchJobInstanceAdapter jobInstance, BatchTransactionContext ctx, Record record) {
        record.getCompletionCallback().incrementConsumers();
        return BufferUtils.addToBufferInSplitTransaction(jobInstance, this.inputQueueBuffer, ctx, this.inputContexts, record);
    }

    @Override
    public void forget(BatchJobInstanceAdapter jobInstance) {
        this.inputQueueBuffer.flushAndForget(jobInstance);
    }

    @Override
    public void flushAndForget(BatchJobInstanceAdapter jobInstance) {
        this.flush(jobInstance);
    }

    @Override
    public void flush(BatchJobInstanceAdapter jobInstance) {
        if (!this.flushingInstances.add(jobInstance)) {
            return;
        }
        try {
            this.forget(jobInstance);
            this.doFlush(jobInstance, (Multimap<BatchTransactionContext, Record>)ArrayListMultimap.create());
        }
        finally {
            this.flushingInstances.remove(jobInstance);
        }
    }

    @Override
    public synchronized long size(BatchJobInstanceAdapter jobInstance) {
        return this.inputQueueBuffer.size(jobInstance) + this.inputQueueBuffer.getFlushedRecordCount(jobInstance);
    }

    @Override
    protected CoreEvent configureChainEvent(CoreEvent event, BatchJobInstanceAdapter jobInstance, Multimap<BatchTransactionContext, Record> records) throws MuleException {
        BatchTransactionContext ctx = this.inputContexts.get(jobInstance);
        Iterator<List<Record>> iterator = this.queueManager.streamingAggregatorInputQueue(jobInstance, this.step).iterator(ctx);
        StreamingAggregatorIteratorWrapper wrapper = new StreamingAggregatorIteratorWrapper(iterator, this.outputContexts, ctx, jobInstance, this.outputQueueBuffer, this.batchEngine.getBlockSize(jobInstance));
        if (!wrapper.hasNext()) {
            wrapper.commit();
            wrapper.close();
            return null;
        }
        return CoreEvent.builder((CoreEvent)event).message(Message.builder((Message)event.getMessage()).value((Object)wrapper).build()).build();
    }

    private void flushOutputBuffer(BatchJobInstanceAdapter jobInstance) {
        this.outputQueueBuffer.flushAndForget(jobInstance);
    }

    private void backToSteppingQueue(BatchJobInstanceAdapter jobInstance, Exception e) throws MuleException {
        BatchQueueDelegate outputQueue = this.queueManager.streamingAggregatorOutputQueue(jobInstance, this.step);
        BatchTransactionContext ctx = this.outputContexts.get(jobInstance);
        try {
            while (outputQueue.size(ctx) > 0L) {
                List<Record> records = outputQueue.poll(ctx);
                this.updateStatisticsAndRoute(ctx, jobInstance, records, e);
                ctx.beginTransaction();
            }
            BatchUtils.commit(ctx);
        }
        catch (Exception thrown) {
            BatchUtils.rollback(ctx);
            throw thrown;
        }
    }

    private void updateStatisticsAndRoute(BatchTransactionContext ctx, BatchJobInstanceAdapter jobInstance, List<Record> records, Exception e) throws MuleException {
        if (e != null) {
            this.markError(jobInstance, records, e);
        }
        this.batchEngine.updateStatisticsAndRoute(ctx, records);
        records.clear();
        BatchUtils.commit(ctx);
    }

    @Override
    protected BatchProcessingTemplate makeProcessingTemplate(Multimap<BatchTransactionContext, Record> records, Processor processor, MuleContext muleContext) {
        return new BatchProcessingTemplate((ReactiveProcessor)processor, this.getLocation(), muleContext.getFlowTraceManager(), muleContext.getStreamCloserService()){
            private StreamingAggregatorIteratorWrapper streamingIterator;

            @Override
            public CoreEvent process(BatchJobInstanceAdapter jobInstance, CoreEvent event, EventTracer<CoreEvent> coreEventTracer) throws MuleException {
                this.streamingIterator = (StreamingAggregatorIteratorWrapper)event.getMessage().getPayload().getValue();
                return super.process(jobInstance, event, coreEventTracer);
            }

            @Override
            protected void onSuccess(BatchJobInstanceAdapter jobInstance, CoreEvent responseEvent) throws MuleException {
                StreamingAggregatorBuffer.this.route(jobInstance, this.streamingIterator, null);
                this.streamingIterator.commit();
            }

            @Override
            protected void onException(BatchJobInstanceAdapter jobInstance, Exception e, CoreEvent event) throws MuleException {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format("Exception was found processing streaming aggregator on step %s for instance %s of job %s", StreamingAggregatorBuffer.this.step.getName(), jobInstance.getId(), jobInstance.getOwnerJobName()));
                    LOGGER.debug(DefaultBatchEngine.buildExceptionLogMessage(e));
                }
                StreamingAggregatorBuffer.this.route(jobInstance, this.streamingIterator, e);
                this.streamingIterator.rollback();
            }
        };
    }

    private void consumePayload(BatchJobInstanceAdapter jobInstance, StreamingAggregatorIteratorWrapper stream) {
        while (stream.hasNext()) {
            stream.next();
        }
        Record previous = stream.getPrevious();
        if (previous != null) {
            BufferUtils.addToBufferInSplitTransaction(jobInstance, this.outputQueueBuffer, null, this.outputContexts, previous);
        }
    }

    private void route(BatchJobInstanceAdapter jobInstance, StreamingAggregatorIteratorWrapper stream, Exception e) throws MuleException {
        this.consumePayload(jobInstance, stream);
        this.flushOutputBuffer(jobInstance);
        this.backToSteppingQueue(jobInstance, e);
    }

    private BatchTransactionContextProvider newProvider() {
        return new ManagedBatchTransactionContextProvider(this.batchEngine, true);
    }
}

