/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor;

import org.apache.camel.ContextTestSupport;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Header;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.ProcessorType;
import org.apache.camel.processor.aggregate.AggregationStrategy;

public class MultiCastAggregatorTest
extends ContextTestSupport {
    protected Endpoint<Exchange> startEndpoint;
    protected MockEndpoint result;

    public void testMulticastReceivesItsOwnExchangeParallelly() throws Exception {
        this.sendingAMessageUsingMulticastReceivesItsOwnExchange(true);
    }

    public void testMulticastReceivesItsOwnExchangeSequentially() throws Exception {
        this.sendingAMessageUsingMulticastReceivesItsOwnExchange(false);
    }

    private void sendingAMessageUsingMulticastReceivesItsOwnExchange(boolean isParallel) throws Exception {
        this.result.expectedBodiesReceived("inputx+inputy+inputz");
        String url = isParallel ? "direct:parallel" : "direct:sequential";
        Object exchange = this.template.send(url, new Processor(){

            public void process(Exchange exchange) {
                Message in = exchange.getIn();
                in.setBody("input");
                in.setHeader("foo", "bar");
            }
        });
        MultiCastAggregatorTest.assertNotNull((String)"We should get result here", exchange);
        MultiCastAggregatorTest.assertEquals((String)"Can't get the right result", (String)"inputx+inputy+inputz", (String)exchange.getOut().getBody(String.class));
        this.assertMockEndpointsSatisifed();
    }

    protected void setUp() throws Exception {
        super.setUp();
        this.result = this.getMockEndpoint("mock:result");
    }

    protected RouteBuilder createRouteBuilder() {
        return new RouteBuilder(){

            public void configure() {
                this.from("direct:parallel").multicast(new BodyOutAggregatingStrategy(), true).to("direct:x", "direct:y", "direct:z");
                this.from("direct:sequential").multicast(new BodyOutAggregatingStrategy()).to("direct:x", "direct:y", "direct:z");
                ((ProcessorType)this.from("direct:x").process(new AppendingProcessor("x"))).to("direct:aggregater");
                ((ProcessorType)this.from("direct:y").process(new AppendingProcessor("y"))).to("direct:aggregater");
                ((ProcessorType)this.from("direct:z").process(new AppendingProcessor("z"))).to("direct:aggregater");
                this.from("direct:aggregater").aggregator(this.header("cheese"), new BodyInAggregatingStrategy()).completedPredicate(this.header("aggregated").isEqualTo(3)).to("mock:result");
            }
        };
    }

    class BodyInAggregatingStrategy
    implements AggregationStrategy {
        BodyInAggregatingStrategy() {
        }

        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            Message newIn = newExchange.getIn();
            String oldBody = oldExchange.getIn().getBody(String.class);
            String newBody = newIn.getBody(String.class);
            newIn.setBody(oldBody + "+" + newBody);
            Integer old = (Integer)oldExchange.getProperty("aggregated");
            if (old == null) {
                old = 1;
            }
            newExchange.setProperty("aggregated", old + 1);
            return newExchange;
        }

        public boolean isCompleted(@Header(name="aggregated") Integer aggregated) {
            if (aggregated == null) {
                return false;
            }
            return aggregated == 3;
        }
    }

    class BodyOutAggregatingStrategy
    implements AggregationStrategy {
        BodyOutAggregatingStrategy() {
        }

        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            Message newOut = newExchange.getOut();
            String oldBody = oldExchange.getOut().getBody(String.class);
            String newBody = newOut.getBody(String.class);
            newOut.setBody(oldBody + "+" + newBody);
            return newExchange;
        }
    }

    class AppendingProcessor
    implements Processor {
        String appendingString;

        AppendingProcessor(String string) {
            this.appendingString = string;
        }

        public void process(Exchange exchange) {
            Message in = exchange.getIn();
            String body = in.getBody(String.class);
            in.setBody(body + this.appendingString);
        }
    }
}

