/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.disruptor;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicMarkableReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.Exchange;
import org.apache.camel.component.disruptor.AbstractLifecycleAwareExchangeEventHandler;
import org.apache.camel.component.disruptor.DisruptorComponent;
import org.apache.camel.component.disruptor.DisruptorConsumer;
import org.apache.camel.component.disruptor.DisruptorEndpoint;
import org.apache.camel.component.disruptor.DisruptorNotStartedException;
import org.apache.camel.component.disruptor.DisruptorProducerType;
import org.apache.camel.component.disruptor.DisruptorWaitStrategy;
import org.apache.camel.component.disruptor.ExchangeEvent;
import org.apache.camel.component.disruptor.ExchangeEventFactory;
import org.apache.camel.component.disruptor.LifecycleAwareExchangeEventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DisruptorReference {
    private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorReference.class);
    private final Set<DisruptorEndpoint> endpoints = Collections.newSetFromMap(new WeakHashMap(4));
    private final DisruptorComponent component;
    private final String uri;
    private final String name;
    private final AtomicMarkableReference<Disruptor<ExchangeEvent>> disruptor = new AtomicMarkableReference<Object>(null, false);
    private final DelayedExecutor delayedExecutor = new DelayedExecutor();
    private final DisruptorProducerType producerType;
    private final int size;
    private final DisruptorWaitStrategy waitStrategy;
    private final Queue<Exchange> temporaryExchangeBuffer;
    private final Lock lock = new ReentrantLock();
    private ExecutorService executor;
    private LifecycleAwareExchangeEventHandler[] handlers = new LifecycleAwareExchangeEventHandler[0];
    private int uniqueConsumerCount;

    DisruptorReference(DisruptorComponent component, String uri, String name, int size, DisruptorProducerType producerType, DisruptorWaitStrategy waitStrategy) throws Exception {
        this.component = component;
        this.uri = uri;
        this.name = name;
        this.size = size;
        this.producerType = producerType;
        this.waitStrategy = waitStrategy;
        this.temporaryExchangeBuffer = new ArrayBlockingQueue<Exchange>(size);
        this.reconfigure();
    }

    public boolean hasNullReference() {
        return this.disruptor.getReference() == null;
    }

    private Disruptor<ExchangeEvent> getCurrentDisruptor() throws DisruptorNotStartedException {
        Disruptor<ExchangeEvent> currentDisruptor = this.disruptor.getReference();
        if (currentDisruptor == null) {
            boolean[] changeIsPending = new boolean[1];
            while (currentDisruptor == null) {
                currentDisruptor = this.disruptor.get(changeIsPending);
                if (currentDisruptor == null && !changeIsPending[0]) {
                    throw new DisruptorNotStartedException("Disruptor is not yet started or already shut down.");
                }
                if (currentDisruptor != null || !changeIsPending[0]) continue;
                LockSupport.parkNanos(1L);
            }
        }
        return currentDisruptor;
    }

    public void tryPublish(Exchange exchange) throws DisruptorNotStartedException, InsufficientCapacityException {
        this.tryPublishExchangeOnRingBuffer(exchange, (RingBuffer<ExchangeEvent>)this.getCurrentDisruptor().getRingBuffer());
    }

    public void publish(Exchange exchange) throws DisruptorNotStartedException {
        this.publishExchangeOnRingBuffer(exchange, (RingBuffer<ExchangeEvent>)this.getCurrentDisruptor().getRingBuffer());
    }

    private void publishExchangeOnRingBuffer(Exchange exchange, RingBuffer<ExchangeEvent> ringBuffer) {
        long sequence = ringBuffer.next();
        ((ExchangeEvent)ringBuffer.get(sequence)).setExchange(exchange, this.uniqueConsumerCount);
        ringBuffer.publish(sequence);
    }

    private void tryPublishExchangeOnRingBuffer(Exchange exchange, RingBuffer<ExchangeEvent> ringBuffer) throws InsufficientCapacityException {
        long sequence = ringBuffer.tryNext();
        ((ExchangeEvent)ringBuffer.get(sequence)).setExchange(exchange, this.uniqueConsumerCount);
        ringBuffer.publish(sequence);
    }

    public void reconfigure() throws Exception {
        this.lock.lock();
        try {
            LOGGER.debug("Reconfiguring disruptor {}", (Object)this);
            this.shutdownDisruptor(true);
            this.start();
        }
        finally {
            this.lock.unlock();
        }
    }

    private void start() throws Exception {
        LOGGER.debug("Starting disruptor {}", (Object)this);
        Disruptor<ExchangeEvent> newDisruptor = this.createDisruptor();
        newDisruptor.start();
        if (this.executor != null) {
            this.delayedExecutor.executeDelayedCommands(this.executor);
        }
        for (LifecycleAwareExchangeEventHandler handler : this.handlers) {
            boolean eventHandlerStarted = false;
            while (!eventHandlerStarted) {
                try {
                    if (!handler.awaitStarted(10L, TimeUnit.SECONDS)) {
                        LOGGER.error("Disruptor/event handler failed to start properly, PLEASE REPORT");
                    }
                    eventHandlerStarted = true;
                }
                catch (InterruptedException e) {
                    LOGGER.info("Interrupted while waiting for the startup to complete");
                    Thread.currentThread().interrupt();
                }
            }
        }
        this.publishBufferedExchanges(newDisruptor);
        this.disruptor.set(newDisruptor, false);
    }

    private Disruptor<ExchangeEvent> createDisruptor() throws Exception {
        Disruptor newDisruptor = new Disruptor((EventFactory)ExchangeEventFactory.INSTANCE, this.size, (Executor)this.delayedExecutor, this.producerType.getProducerType(), this.waitStrategy.createWaitStrategyInstance());
        ArrayList<LifecycleAwareExchangeEventHandler> eventHandlers = new ArrayList<LifecycleAwareExchangeEventHandler>();
        this.uniqueConsumerCount = 0;
        for (DisruptorEndpoint endpoint : this.endpoints) {
            Map<DisruptorConsumer, Collection<LifecycleAwareExchangeEventHandler>> consumerEventHandlers = endpoint.createConsumerEventHandlers();
            if (consumerEventHandlers == null) continue;
            this.uniqueConsumerCount += consumerEventHandlers.keySet().size();
            for (Collection<LifecycleAwareExchangeEventHandler> lifecycleAwareExchangeEventHandlers : consumerEventHandlers.values()) {
                eventHandlers.addAll(lifecycleAwareExchangeEventHandlers);
            }
        }
        LOGGER.debug("Disruptor created with {} event handlers", (Object)eventHandlers.size());
        this.handleEventsWith((Disruptor<ExchangeEvent>)newDisruptor, eventHandlers.toArray(new LifecycleAwareExchangeEventHandler[0]));
        return newDisruptor;
    }

    private void handleEventsWith(Disruptor<ExchangeEvent> newDisruptor, LifecycleAwareExchangeEventHandler[] newHandlers) {
        if (newHandlers == null || newHandlers.length == 0) {
            this.handlers = new LifecycleAwareExchangeEventHandler[1];
            this.handlers[0] = new BlockingExchangeEventHandler();
        } else {
            this.handlers = newHandlers;
        }
        this.resizeThreadPoolExecutor(this.handlers.length);
        newDisruptor.handleEventsWith((EventHandler[])this.handlers);
    }

    private void publishBufferedExchanges(Disruptor<ExchangeEvent> newDisruptor) {
        ArrayList<Exchange> exchanges = new ArrayList<Exchange>(this.temporaryExchangeBuffer.size());
        while (!this.temporaryExchangeBuffer.isEmpty()) {
            exchanges.add(this.temporaryExchangeBuffer.remove());
        }
        RingBuffer ringBuffer = newDisruptor.getRingBuffer();
        for (Exchange exchange : exchanges) {
            this.publishExchangeOnRingBuffer(exchange, (RingBuffer<ExchangeEvent>)ringBuffer);
        }
    }

    private void resizeThreadPoolExecutor(int newSize) {
        if (this.executor == null && newSize > 0) {
            LOGGER.debug("Creating new executor with {} threads", (Object)newSize);
            this.executor = this.component.getCamelContext().getExecutorServiceManager().newFixedThreadPool((Object)this, this.uri, newSize);
        } else if (this.executor != null && newSize <= 0) {
            LOGGER.debug("Shutting down executor");
            this.component.getCamelContext().getExecutorServiceManager().shutdown(this.executor);
            this.executor = null;
        } else {
            ExecutorService executorService = this.executor;
            if (executorService instanceof ThreadPoolExecutor) {
                ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)executorService;
                LOGGER.debug("Resizing existing executor to {} threads", (Object)newSize);
                if (newSize <= threadPoolExecutor.getCorePoolSize()) {
                    threadPoolExecutor.setCorePoolSize(newSize);
                    threadPoolExecutor.setMaximumPoolSize(newSize);
                } else {
                    threadPoolExecutor.setMaximumPoolSize(newSize);
                    threadPoolExecutor.setCorePoolSize(newSize);
                }
            } else if (newSize > 0) {
                LOGGER.debug("Shutting down old and creating new executor with {} threads", (Object)newSize);
                this.component.getCamelContext().getExecutorServiceManager().shutdown(this.executor);
                this.executor = this.component.getCamelContext().getExecutorServiceManager().newFixedThreadPool((Object)this, this.uri, newSize);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shutdownDisruptor(boolean isReconfiguring) {
        this.lock.lock();
        try {
            LOGGER.debug("Shutting down disruptor {}, reconfiguring: {}", (Object)this, (Object)isReconfiguring);
            Disruptor<ExchangeEvent> currentDisruptor = this.disruptor.getReference();
            this.disruptor.set(null, isReconfiguring);
            if (currentDisruptor != null) {
                LifecycleAwareExchangeEventHandler lifecycleAwareExchangeEventHandler;
                if (this.handlers != null && this.handlers.length == 1 && (lifecycleAwareExchangeEventHandler = this.handlers[0]) instanceof BlockingExchangeEventHandler) {
                    BlockingExchangeEventHandler blockingExchangeEventHandler = (BlockingExchangeEventHandler)lifecycleAwareExchangeEventHandler;
                    blockingExchangeEventHandler.unblock();
                }
                currentDisruptor.shutdown();
                for (LifecycleAwareExchangeEventHandler eventHandler : this.handlers) {
                    boolean eventHandlerFinished = false;
                    while (!eventHandlerFinished) {
                        try {
                            if (!eventHandler.awaitStopped(10L, TimeUnit.SECONDS)) {
                                LOGGER.error("Disruptor/event handler failed to shut down properly, PLEASE REPORT");
                            }
                            eventHandlerFinished = true;
                        }
                        catch (InterruptedException e) {
                            LOGGER.info("Interrupted while waiting for the shutdown to complete");
                            Thread.currentThread().interrupt();
                        }
                    }
                }
                this.handlers = new LifecycleAwareExchangeEventHandler[0];
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private void shutdownExecutor() {
        this.lock.lock();
        try {
            this.resizeThreadPoolExecutor(0);
        }
        finally {
            this.lock.unlock();
        }
    }

    public String getName() {
        return this.name;
    }

    public long getRemainingCapacity() throws DisruptorNotStartedException {
        return this.getCurrentDisruptor().getRingBuffer().remainingCapacity();
    }

    public DisruptorWaitStrategy getWaitStrategy() {
        return this.waitStrategy;
    }

    DisruptorProducerType getProducerType() {
        return this.producerType;
    }

    public int getBufferSize() {
        return this.size;
    }

    public int getPendingExchangeCount() {
        try {
            if (!this.hasNullReference()) {
                return (int)((long)this.getBufferSize() - this.getRemainingCapacity() + (long)this.temporaryExchangeBuffer.size());
            }
        }
        catch (DisruptorNotStartedException disruptorNotStartedException) {
            // empty catch block
        }
        return this.temporaryExchangeBuffer.size();
    }

    public void addEndpoint(DisruptorEndpoint disruptorEndpoint) {
        this.lock.lock();
        try {
            LOGGER.debug("Adding Endpoint: {}", (Object)disruptorEndpoint);
            this.endpoints.add(disruptorEndpoint);
            LOGGER.debug("Endpoint added: {}, new total endpoints {}", (Object)disruptorEndpoint, (Object)this.endpoints.size());
        }
        finally {
            this.lock.unlock();
        }
    }

    public void removeEndpoint(DisruptorEndpoint disruptorEndpoint) {
        this.lock.lock();
        try {
            LOGGER.debug("Removing Endpoint: {}", (Object)disruptorEndpoint);
            if (this.getEndpointCount() == 1) {
                LOGGER.debug("Last Endpoint removed, shutdown disruptor");
                this.shutdownDisruptor(false);
                this.shutdownExecutor();
            }
            this.endpoints.remove((Object)disruptorEndpoint);
            LOGGER.debug("Endpoint removed: {}, new total endpoints {}", (Object)disruptorEndpoint, (Object)this.getEndpointCount());
        }
        finally {
            this.lock.unlock();
        }
    }

    public int getEndpointCount() {
        this.lock.lock();
        try {
            int n = this.endpoints.size();
            return n;
        }
        finally {
            this.lock.unlock();
        }
    }

    public String toString() {
        return "DisruptorReference{uri='" + this.uri + "', endpoint count=" + this.endpoints.size() + ", handler count=" + this.handlers.length + "}";
    }

    private static class DelayedExecutor
    implements Executor {
        private final Queue<Runnable> delayedCommands = new LinkedList<Runnable>();

        private DelayedExecutor() {
        }

        @Override
        public void execute(Runnable command) {
            this.delayedCommands.offer(command);
        }

        public void executeDelayedCommands(Executor actualExecutor) {
            Runnable command;
            while ((command = this.delayedCommands.poll()) != null) {
                actualExecutor.execute(command);
            }
        }
    }

    private class BlockingExchangeEventHandler
    extends AbstractLifecycleAwareExchangeEventHandler {
        private final CountDownLatch blockingLatch = new CountDownLatch(1);

        private BlockingExchangeEventHandler() {
        }

        @Override
        public void onEvent(ExchangeEvent event, long sequence, boolean endOfBatch) throws Exception {
            this.blockingLatch.await();
            Exchange exchange = event.getSynchronizedExchange().cancelAndGetOriginalExchange();
            boolean ignoreExchange = (Boolean)exchange.getProperty("disruptor.ignoreExchange", (Object)false, Boolean.TYPE);
            if (ignoreExchange) {
                LOGGER.trace("Ignoring exchange {}", (Object)exchange);
            } else {
                DisruptorReference.this.temporaryExchangeBuffer.offer(exchange);
            }
        }

        public void unblock() {
            this.blockingLatch.countDown();
        }
    }
}

