/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.disruptor;

import com.lmax.disruptor.InsufficientCapacityException;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.StreamCache;
import org.apache.camel.WaitForTaskToComplete;
import org.apache.camel.component.disruptor.DisruptorEndpoint;
import org.apache.camel.component.disruptor.DisruptorNotStartedException;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.SynchronizationAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DisruptorProducer
extends DefaultAsyncProducer {
    private static final Logger LOG = LoggerFactory.getLogger(DisruptorProducer.class);
    private final WaitForTaskToComplete waitForTaskToComplete;
    private final long timeout;
    private final DisruptorEndpoint endpoint;
    private final boolean blockWhenFull;

    public DisruptorProducer(DisruptorEndpoint endpoint, WaitForTaskToComplete waitForTaskToComplete, long timeout, boolean blockWhenFull) {
        super((Endpoint)endpoint);
        this.waitForTaskToComplete = waitForTaskToComplete;
        this.timeout = timeout;
        this.endpoint = endpoint;
        this.blockWhenFull = blockWhenFull;
    }

    public DisruptorEndpoint getEndpoint() {
        return this.endpoint;
    }

    protected void doStart() throws Exception {
        this.getEndpoint().onStarted(this);
    }

    protected void doStop() throws Exception {
        this.getEndpoint().onStopped(this);
    }

    public boolean process(Exchange exchange, AsyncCallback callback) {
        block12: {
            WaitForTaskToComplete wait = this.waitForTaskToComplete;
            if (exchange.getProperty("CamelAsyncWait") != null) {
                wait = (WaitForTaskToComplete)exchange.getProperty("CamelAsyncWait", WaitForTaskToComplete.class);
            }
            try {
                if (wait == WaitForTaskToComplete.Always || wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable((Exchange)exchange)) {
                    Exchange copy = this.prepareCopy(exchange, false);
                    CountDownLatch latch = new CountDownLatch(1);
                    copy.getExchangeExtension().addOnCompletion((Synchronization)this.newOnCompletion(exchange, latch));
                    this.doPublish(copy);
                    if (this.timeout > 0L) {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("Waiting for task to complete using timeout (ms): {} at [{}]", (Object)this.timeout, (Object)this.endpoint.getEndpointUri());
                        }
                        boolean done = false;
                        try {
                            done = latch.await(this.timeout, TimeUnit.MILLISECONDS);
                        }
                        catch (InterruptedException e) {
                            LOG.info("Interrupted while waiting for the task to complete");
                            Thread.currentThread().interrupt();
                        }
                        if (!done) {
                            exchange.setProperty("disruptor.ignoreExchange", (Object)true);
                            exchange.setException((Throwable)new ExchangeTimedOutException(exchange, this.timeout));
                            latch.countDown();
                        }
                        break block12;
                    }
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Waiting for task to complete (blocking) at [{}]", (Object)this.endpoint.getEndpointUri());
                    }
                    try {
                        latch.await();
                    }
                    catch (InterruptedException e) {
                        LOG.info("Interrupted while waiting for the task to complete");
                        Thread.currentThread().interrupt();
                    }
                    break block12;
                }
                Exchange copy = this.prepareCopy(exchange, true);
                this.doPublish(copy);
            }
            catch (Exception e) {
                exchange.setException((Throwable)e);
            }
        }
        callback.done(true);
        return true;
    }

    private SynchronizationAdapter newOnCompletion(final Exchange exchange, final CountDownLatch latch) {
        return new SynchronizationAdapter(){

            public void onDone(Exchange response) {
                if (latch.getCount() == 0L) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("{}. Timeout occurred so response will be ignored: {}", (Object)this, (Object)response.getMessage());
                    }
                } else {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("{} with response: {}", (Object)this, (Object)response.getMessage());
                    }
                    try {
                        ExchangeHelper.copyResults((Exchange)exchange, (Exchange)response);
                    }
                    finally {
                        latch.countDown();
                    }
                }
            }

            public boolean allowHandover() {
                return false;
            }

            public String toString() {
                return "onDone at endpoint: " + String.valueOf((Object)DisruptorProducer.this.endpoint);
            }
        };
    }

    private void doPublish(Exchange exchange) {
        LOG.trace("Publishing Exchange to disruptor ringbuffer: {}", (Object)exchange);
        try {
            if (this.blockWhenFull) {
                this.endpoint.publish(exchange);
            } else {
                this.endpoint.tryPublish(exchange);
            }
        }
        catch (DisruptorNotStartedException e) {
            throw new IllegalStateException("Disruptor was not started", e);
        }
        catch (InsufficientCapacityException e) {
            throw new IllegalStateException("Disruptors ringbuffer was full", e);
        }
    }

    private Exchange prepareCopy(Exchange exchange, boolean copy) throws IOException {
        StreamCache sc;
        StreamCache newBody;
        Object object;
        Exchange target = ExchangeHelper.createCorrelatedCopy((Exchange)exchange, (boolean)copy);
        target.getExchangeExtension().setFromEndpoint((Endpoint)this.endpoint);
        if (copy && (object = target.getMessage().getBody()) instanceof StreamCache && (newBody = (sc = (StreamCache)object).copy(target)) != null) {
            target.getMessage().setBody((Object)newBody);
        }
        return target;
    }
}

