/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder;

import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.AbstractTestBinder;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.DefaultPollableMessageSource;
import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.SerializableFoo;
import org.springframework.cloud.stream.binder.Spy;
import org.springframework.cloud.stream.binder.TestUtils;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
import org.springframework.cloud.stream.binding.StreamListenerMessageHandler;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.cloud.stream.converter.JavaSerializationMessageConverter;
import org.springframework.cloud.stream.converter.KryoMessageConverter;
import org.springframework.cloud.stream.converter.MessageConverterUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.Lifecycle;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ReflectionUtils;

public abstract class AbstractBinderTests<B extends AbstractTestBinder<? extends AbstractBinder<MessageChannel, CP, PP>, CP, PP>, CP extends ConsumerProperties, PP extends ProducerProperties> {
    protected final Log logger = LogFactory.getLog(this.getClass());
    protected B testBinder;
    protected SmartMessageConverter messageConverter;
    protected double timeoutMultiplier = 1.0;

    @Before
    public void before() {
        this.messageConverter = new CompositeMessageConverterFactory().getMessageConverterForAllRegistered();
    }

    protected Message<?> receive(PollableChannel channel) {
        return this.receive(channel, 1);
    }

    protected Message<?> receive(PollableChannel channel, int additionalMultiplier) {
        long startTime = System.currentTimeMillis();
        Message receive = channel.receive((long)((int)(1000.0 * this.timeoutMultiplier * (double)additionalMultiplier)));
        long elapsed = System.currentTimeMillis() - startTime;
        this.logger.debug((Object)("receive() took " + elapsed / 1000L + " seconds"));
        return receive;
    }

    @Test
    public void testClean() throws Exception {
        B binder = this.getBinder();
        Binding foo0ProducerBinding = binder.bindProducer(String.format("foo%s0", this.getDestinationNameDelimiter()), (Object)this.createBindableChannel("output", new BindingProperties()), this.createProducerProperties());
        Binding foo0ConsumerBinding = binder.bindConsumer(String.format("foo%s0", this.getDestinationNameDelimiter()), "testClean", (Object)this.createBindableChannel("input", new BindingProperties()), this.createConsumerProperties());
        Binding foo1ProducerBinding = binder.bindProducer(String.format("foo%s1", this.getDestinationNameDelimiter()), (Object)this.createBindableChannel("output", new BindingProperties()), this.createProducerProperties());
        Binding foo1ConsumerBinding = binder.bindConsumer(String.format("foo%s1", this.getDestinationNameDelimiter()), "testClean", (Object)this.createBindableChannel("input", new BindingProperties()), this.createConsumerProperties());
        Binding foo2ProducerBinding = binder.bindProducer(String.format("foo%s2", this.getDestinationNameDelimiter()), (Object)this.createBindableChannel("output", new BindingProperties()), this.createProducerProperties());
        foo0ProducerBinding.unbind();
        Assertions.assertThat((boolean)TestUtils.getPropertyValue(foo0ProducerBinding, "lifecycle", Lifecycle.class).isRunning()).isFalse();
        foo0ConsumerBinding.unbind();
        foo1ProducerBinding.unbind();
        Assertions.assertThat((boolean)TestUtils.getPropertyValue(foo0ConsumerBinding, "lifecycle", Lifecycle.class).isRunning()).isFalse();
        Assertions.assertThat((boolean)TestUtils.getPropertyValue(foo1ProducerBinding, "lifecycle", Lifecycle.class).isRunning()).isFalse();
        foo1ConsumerBinding.unbind();
        foo2ProducerBinding.unbind();
        Assertions.assertThat((boolean)TestUtils.getPropertyValue(foo1ConsumerBinding, "lifecycle", Lifecycle.class).isRunning()).isFalse();
        Assertions.assertThat((boolean)TestUtils.getPropertyValue(foo2ProducerBinding, "lifecycle", Lifecycle.class).isRunning()).isFalse();
    }

    @Test
    public void testSendAndReceive() throws Exception {
        B binder = this.getBinder();
        BindingProperties outputBindingProperties = this.createProducerBindingProperties(this.createProducerProperties());
        DirectChannel moduleOutputChannel = this.createBindableChannel("output", outputBindingProperties);
        BindingProperties inputBindingProperties = this.createConsumerBindingProperties(this.createConsumerProperties());
        DirectChannel moduleInputChannel = this.createBindableChannel("input", inputBindingProperties);
        Binding producerBinding = binder.bindProducer(String.format("foo%s0", this.getDestinationNameDelimiter()), (Object)moduleOutputChannel, outputBindingProperties.getProducer());
        Binding consumerBinding = binder.bindConsumer(String.format("foo%s0", this.getDestinationNameDelimiter()), "testSendAndReceive", (Object)moduleInputChannel, inputBindingProperties.getConsumer());
        Message message = MessageBuilder.withPayload((Object)"foo").setHeader("contentType", (Object)"text/plain").build();
        this.binderBindUnbindLatency();
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference inboundMessageRef = new AtomicReference();
        moduleInputChannel.subscribe(message1 -> {
            try {
                inboundMessageRef.set(message1);
            }
            finally {
                latch.countDown();
            }
        });
        moduleOutputChannel.send(message);
        Assert.isTrue((boolean)latch.await(5L, TimeUnit.SECONDS), (String)"Failed to receive message");
        Assertions.assertThat((byte[])((byte[])((Message)inboundMessageRef.get()).getPayload())).isEqualTo((Object)"foo".getBytes());
        Assertions.assertThat((Object)((Message)inboundMessageRef.get()).getHeaders().get((Object)"originalContentType")).isNull();
        Assertions.assertThat((String)((Message)inboundMessageRef.get()).getHeaders().get((Object)"contentType").toString()).isEqualTo((Object)"text/plain");
        producerBinding.unbind();
        consumerBinding.unbind();
    }

    @Test
    public void testSendAndReceiveKryo() throws Exception {
        B binder = this.getBinder();
        BindingProperties outputBindingProperties = this.createProducerBindingProperties(this.createProducerProperties());
        DirectChannel moduleOutputChannel = this.createBindableChannel("output", outputBindingProperties);
        BindingProperties inputBindingProperties = this.createConsumerBindingProperties(this.createConsumerProperties());
        DirectChannel moduleInputChannel = this.createBindableChannel("input", inputBindingProperties);
        Binding producerBinding = binder.bindProducer(String.format("foo%s0x", this.getDestinationNameDelimiter()), (Object)moduleOutputChannel, outputBindingProperties.getProducer());
        Binding consumerBinding = binder.bindConsumer(String.format("foo%s0x", this.getDestinationNameDelimiter()), "testSendAndReceiveKryo", (Object)moduleInputChannel, inputBindingProperties.getConsumer());
        Foo foo = new Foo();
        foo.setName("Bill");
        Message message = MessageBuilder.withPayload((Object)foo).setHeader("contentType", (Object)MessageConverterUtils.X_JAVA_OBJECT).build();
        this.binderBindUnbindLatency();
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference inboundMessageRef = new AtomicReference();
        moduleInputChannel.subscribe(message1 -> {
            try {
                inboundMessageRef.set(message1);
            }
            finally {
                latch.countDown();
            }
        });
        moduleOutputChannel.send(message);
        Assert.isTrue((boolean)latch.await(5L, TimeUnit.SECONDS), (String)"Failed to receive message");
        KryoMessageConverter kryo = new KryoMessageConverter(null, true);
        Foo fooPayload = (Foo)kryo.fromMessage((Message)inboundMessageRef.get(), Foo.class);
        Assertions.assertThat((Object)fooPayload).isNotNull();
        Assertions.assertThat((Object)((Message)inboundMessageRef.get()).getHeaders().get((Object)"originalContentType")).isNull();
        producerBinding.unbind();
        consumerBinding.unbind();
    }

    @Test
    public void testSendAndReceiveJavaSerialization() throws Exception {
        B binder = this.getBinder();
        BindingProperties outputBindingProperties = this.createProducerBindingProperties(this.createProducerProperties());
        DirectChannel moduleOutputChannel = this.createBindableChannel("output", outputBindingProperties);
        BindingProperties inputBindingProperties = this.createConsumerBindingProperties(this.createConsumerProperties());
        DirectChannel moduleInputChannel = this.createBindableChannel("input", inputBindingProperties);
        Binding producerBinding = binder.bindProducer(String.format("foo%s0y", this.getDestinationNameDelimiter()), (Object)moduleOutputChannel, outputBindingProperties.getProducer());
        Binding consumerBinding = binder.bindConsumer(String.format("foo%s0y", this.getDestinationNameDelimiter()), "testSendAndReceiveJavaSerialization", (Object)moduleInputChannel, inputBindingProperties.getConsumer());
        SerializableFoo foo = new SerializableFoo();
        Message message = MessageBuilder.withPayload((Object)foo).setHeader("contentType", (Object)MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT).build();
        this.binderBindUnbindLatency();
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference inboundMessageRef = new AtomicReference();
        moduleInputChannel.subscribe(message1 -> {
            try {
                inboundMessageRef.set(message1);
            }
            finally {
                latch.countDown();
            }
        });
        moduleOutputChannel.send(message);
        Assert.isTrue((boolean)latch.await(5L, TimeUnit.SECONDS), (String)"Failed to receive message");
        JavaSerializationMessageConverter converter = new JavaSerializationMessageConverter();
        SerializableFoo serializableFoo = (SerializableFoo)converter.convertFromInternal((Message)inboundMessageRef.get(), SerializableFoo.class, null);
        Assertions.assertThat((Object)serializableFoo).isNotNull();
        Assertions.assertThat((Object)((Message)inboundMessageRef.get()).getHeaders().get((Object)"originalContentType")).isNull();
        Assertions.assertThat((Object)((Message)inboundMessageRef.get()).getHeaders().get((Object)"contentType")).isEqualTo((Object)MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT);
        producerBinding.unbind();
        consumerBinding.unbind();
    }

    @Test
    public void testSendAndReceiveMultipleTopics() throws Exception {
        B binder = this.getBinder();
        BindingProperties producerBindingProperties = this.createProducerBindingProperties(this.createProducerProperties());
        DirectChannel moduleOutputChannel1 = this.createBindableChannel("output1", producerBindingProperties);
        DirectChannel moduleOutputChannel2 = this.createBindableChannel("output2", producerBindingProperties);
        QueueChannel moduleInputChannel = new QueueChannel();
        Binding producerBinding1 = binder.bindProducer(String.format("foo%sxy", this.getDestinationNameDelimiter()), (Object)moduleOutputChannel1, producerBindingProperties.getProducer());
        Binding producerBinding2 = binder.bindProducer(String.format("foo%syz", this.getDestinationNameDelimiter()), (Object)moduleOutputChannel2, producerBindingProperties.getProducer());
        Binding consumerBinding1 = binder.bindConsumer(String.format("foo%sxy", this.getDestinationNameDelimiter()), "testSendAndReceiveMultipleTopics", (Object)moduleInputChannel, this.createConsumerProperties());
        Binding consumerBinding2 = binder.bindConsumer(String.format("foo%syz", this.getDestinationNameDelimiter()), "testSendAndReceiveMultipleTopics", (Object)moduleInputChannel, this.createConsumerProperties());
        String testPayload1 = "foo" + UUID.randomUUID().toString();
        Message message1 = MessageBuilder.withPayload((Object)testPayload1.getBytes()).setHeader("contentType", (Object)MimeTypeUtils.APPLICATION_OCTET_STREAM).build();
        String testPayload2 = "foo" + UUID.randomUUID().toString();
        Message message2 = MessageBuilder.withPayload((Object)testPayload2.getBytes()).setHeader("contentType", (Object)MimeTypeUtils.APPLICATION_OCTET_STREAM).build();
        this.binderBindUnbindLatency();
        moduleOutputChannel1.send(message1);
        moduleOutputChannel2.send(message2);
        Object[] messages = new Message[]{this.receive((PollableChannel)moduleInputChannel), this.receive((PollableChannel)moduleInputChannel)};
        Assertions.assertThat((Object)messages[0]).isNotNull();
        Assertions.assertThat((Object)messages[1]).isNotNull();
        Assertions.assertThat((Object[])messages).extracting("payload").containsExactlyInAnyOrder(new Object[]{testPayload1.getBytes(), testPayload2.getBytes()});
        producerBinding1.unbind();
        producerBinding2.unbind();
        consumerBinding1.unbind();
        consumerBinding2.unbind();
    }

    @Test
    public void testSendAndReceiveNoOriginalContentType() throws Exception {
        B binder = this.getBinder();
        BindingProperties producerBindingProperties = this.createProducerBindingProperties(this.createProducerProperties());
        DirectChannel moduleOutputChannel = this.createBindableChannel("output", producerBindingProperties);
        BindingProperties inputBindingProperties = this.createConsumerBindingProperties(this.createConsumerProperties());
        DirectChannel moduleInputChannel = this.createBindableChannel("input", inputBindingProperties);
        Binding producerBinding = binder.bindProducer(String.format("bar%s0", this.getDestinationNameDelimiter()), (Object)moduleOutputChannel, producerBindingProperties.getProducer());
        Binding consumerBinding = binder.bindConsumer(String.format("bar%s0", this.getDestinationNameDelimiter()), "testSendAndReceiveNoOriginalContentType", (Object)moduleInputChannel, this.createConsumerProperties());
        this.binderBindUnbindLatency();
        Message message = MessageBuilder.withPayload((Object)"foo").setHeader("contentType", (Object)MimeTypeUtils.TEXT_PLAIN).build();
        moduleOutputChannel.send(message);
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference inboundMessageRef = new AtomicReference();
        moduleInputChannel.subscribe(message1 -> {
            try {
                inboundMessageRef.set(message1);
            }
            finally {
                latch.countDown();
            }
        });
        moduleOutputChannel.send(message);
        Assert.isTrue((boolean)latch.await(5L, TimeUnit.SECONDS), (String)"Failed to receive message");
        Assertions.assertThat(inboundMessageRef.get()).isNotNull();
        Assertions.assertThat((byte[])((byte[])((Message)inboundMessageRef.get()).getPayload())).isEqualTo((Object)"foo".getBytes());
        Assertions.assertThat((String)((Message)inboundMessageRef.get()).getHeaders().get((Object)"contentType").toString()).isEqualTo((Object)"text/plain");
        producerBinding.unbind();
        consumerBinding.unbind();
    }

    protected abstract B getBinder() throws Exception;

    protected abstract CP createConsumerProperties();

    protected abstract PP createProducerProperties();

    protected final BindingProperties createConsumerBindingProperties(CP consumerProperties) {
        BindingProperties bindingProperties = new BindingProperties();
        bindingProperties.setConsumer(consumerProperties);
        return bindingProperties;
    }

    protected BindingProperties createProducerBindingProperties(PP producerProperties) {
        BindingProperties bindingProperties = new BindingProperties();
        bindingProperties.setProducer(producerProperties);
        return bindingProperties;
    }

    protected DirectChannel createBindableChannel(String channelName, BindingProperties bindingProperties) throws Exception {
        return this.createBindableChannel(channelName, bindingProperties, channelName.contains("input"));
    }

    protected DirectChannel createBindableChannel(String channelName, BindingProperties bindingProperties, boolean inputChannel) throws Exception {
        MessageConverterConfigurer messageConverterConfigurer = this.createConverterConfigurer(channelName, bindingProperties);
        DirectChannel channel = new DirectChannel();
        channel.setBeanName(channelName);
        if (inputChannel) {
            messageConverterConfigurer.configureInputChannel((MessageChannel)channel, channelName);
        } else {
            messageConverterConfigurer.configureOutputChannel((MessageChannel)channel, channelName);
        }
        return channel;
    }

    protected DefaultPollableMessageSource createBindableMessageSource(String bindingName, BindingProperties bindingProperties) throws Exception {
        DefaultPollableMessageSource source = new DefaultPollableMessageSource((SmartMessageConverter)new CompositeMessageConverterFactory().getMessageConverterForAllRegistered());
        this.createConverterConfigurer(bindingName, bindingProperties).configurePolledMessageSource((PollableMessageSource)source, bindingName);
        return source;
    }

    private MessageConverterConfigurer createConverterConfigurer(String channelName, BindingProperties bindingProperties) throws Exception {
        BindingServiceProperties bindingServiceProperties = new BindingServiceProperties();
        bindingServiceProperties.getBindings().put(channelName, bindingProperties);
        GenericApplicationContext applicationContext = new GenericApplicationContext();
        applicationContext.refresh();
        bindingServiceProperties.setApplicationContext((ApplicationContext)applicationContext);
        bindingServiceProperties.setConversionService((ConversionService)new DefaultConversionService());
        bindingServiceProperties.afterPropertiesSet();
        MessageConverterConfigurer messageConverterConfigurer = new MessageConverterConfigurer(bindingServiceProperties, new CompositeMessageConverterFactory(null, null));
        messageConverterConfigurer.setBeanFactory((BeanFactory)applicationContext.getBeanFactory());
        return messageConverterConfigurer;
    }

    @After
    public void cleanup() {
        if (this.testBinder != null) {
            ((AbstractTestBinder)this.testBinder).cleanup();
        }
    }

    protected void binderBindUnbindLatency() throws InterruptedException {
    }

    public abstract Spy spyOn(String var1);

    protected String getDestinationNameDelimiter() {
        return ".";
    }

    @Test
    public void testSendPojoReceivePojoWithStreamListenerDefaultContentType() throws Exception {
        StreamListenerMessageHandler handler = this.buildStreamListener(AbstractBinderTests.class, "echoStation", Station.class);
        B binder = this.getBinder();
        BindingProperties producerBindingProperties = this.createProducerBindingProperties(this.createProducerProperties());
        DirectChannel moduleOutputChannel = this.createBindableChannel("output", producerBindingProperties);
        BindingProperties consumerBindingProperties = this.createConsumerBindingProperties(this.createConsumerProperties());
        DirectChannel moduleInputChannel = this.createBindableChannel("input", consumerBindingProperties);
        Binding producerBinding = binder.bindProducer(String.format("bad%s0a", this.getDestinationNameDelimiter()), (Object)moduleOutputChannel, producerBindingProperties.getProducer());
        Binding consumerBinding = binder.bindConsumer(String.format("bad%s0a", this.getDestinationNameDelimiter()), "test-1", (Object)moduleInputChannel, consumerBindingProperties.getConsumer());
        Station station = new Station();
        Message message = MessageBuilder.withPayload((Object)station).build();
        moduleInputChannel.subscribe((MessageHandler)handler);
        moduleOutputChannel.send(message);
        QueueChannel replyChannel = (QueueChannel)handler.getOutputChannel();
        Message replyMessage = replyChannel.receive(5000L);
        Assertions.assertThat((boolean)(replyMessage.getPayload() instanceof Station)).isTrue();
        producerBinding.unbind();
        consumerBinding.unbind();
    }

    @Test
    public void testSendPojoReceivePojoKryoWithStreamListener() throws Exception {
        StreamListenerMessageHandler handler = this.buildStreamListener(AbstractBinderTests.class, "echoStation", Station.class);
        B binder = this.getBinder();
        BindingProperties producerBindingProperties = this.createProducerBindingProperties(this.createProducerProperties());
        DirectChannel moduleOutputChannel = this.createBindableChannel("output", producerBindingProperties);
        BindingProperties consumerBindingProperties = this.createConsumerBindingProperties(this.createConsumerProperties());
        DirectChannel moduleInputChannel = this.createBindableChannel("input", consumerBindingProperties);
        Binding producerBinding = binder.bindProducer(String.format("bad%s0b", this.getDestinationNameDelimiter()), (Object)moduleOutputChannel, producerBindingProperties.getProducer());
        Binding consumerBinding = binder.bindConsumer(String.format("bad%s0b", this.getDestinationNameDelimiter()), "test-2", (Object)moduleInputChannel, consumerBindingProperties.getConsumer());
        Station station = new Station();
        Message message = MessageBuilder.withPayload((Object)station).setHeader("contentType", (Object)MessageConverterUtils.X_JAVA_OBJECT).build();
        moduleInputChannel.subscribe((MessageHandler)handler);
        moduleOutputChannel.send(message);
        QueueChannel replyChannel = (QueueChannel)handler.getOutputChannel();
        Message replyMessage = replyChannel.receive(5000L);
        Assertions.assertThat((boolean)(replyMessage.getPayload() instanceof Station)).isTrue();
        producerBinding.unbind();
        consumerBinding.unbind();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=MessageDeliveryException.class)
    public void testStreamListenerJavaSerializationNonSerializable() throws Exception {
        B binder = this.getBinder();
        BindingProperties producerBindingProperties = this.createProducerBindingProperties(this.createProducerProperties());
        DirectChannel moduleOutputChannel = this.createBindableChannel("output", producerBindingProperties);
        BindingProperties consumerBindingProperties = this.createConsumerBindingProperties(this.createConsumerProperties());
        DirectChannel moduleInputChannel = this.createBindableChannel("input", consumerBindingProperties);
        Binding producerBinding = binder.bindProducer(String.format("bad%s0c", this.getDestinationNameDelimiter()), (Object)moduleOutputChannel, producerBindingProperties.getProducer());
        Binding consumerBinding = binder.bindConsumer(String.format("bad%s0c", this.getDestinationNameDelimiter()), "test-3", (Object)moduleInputChannel, consumerBindingProperties.getConsumer());
        try {
            Station station = new Station();
            Message message = MessageBuilder.withPayload((Object)station).setHeader("contentType", (Object)MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT).build();
            moduleOutputChannel.send(message);
        }
        finally {
            producerBinding.unbind();
            consumerBinding.unbind();
        }
    }

    @Test
    public void testSendJsonReceivePojoWithStreamListener() throws Exception {
        StreamListenerMessageHandler handler = this.buildStreamListener(AbstractBinderTests.class, "echoStation", Station.class);
        B binder = this.getBinder();
        BindingProperties producerBindingProperties = this.createProducerBindingProperties(this.createProducerProperties());
        DirectChannel moduleOutputChannel = this.createBindableChannel("output", producerBindingProperties);
        BindingProperties consumerBindingProperties = this.createConsumerBindingProperties(this.createConsumerProperties());
        DirectChannel moduleInputChannel = this.createBindableChannel("input", consumerBindingProperties);
        Binding producerBinding = binder.bindProducer(String.format("bad%s0d", this.getDestinationNameDelimiter()), (Object)moduleOutputChannel, producerBindingProperties.getProducer());
        Binding consumerBinding = binder.bindConsumer(String.format("bad%s0d", this.getDestinationNameDelimiter()), "test-4", (Object)moduleInputChannel, consumerBindingProperties.getConsumer());
        String value = "{\"readings\":[{\"stationid\":\"fgh\",\"customerid\":\"12345\",\"timestamp\":null},{\"stationid\":\"hjk\",\"customerid\":\"222\",\"timestamp\":null}]}";
        Message message = MessageBuilder.withPayload((Object)value).setHeader("contentType", (Object)MimeTypeUtils.APPLICATION_JSON).build();
        moduleInputChannel.subscribe((MessageHandler)handler);
        moduleOutputChannel.send(message);
        QueueChannel channel = (QueueChannel)handler.getOutputChannel();
        Message reply = channel.receive(5000L);
        Assertions.assertThat((Object)reply).isNotNull();
        Assertions.assertThat((boolean)(reply.getPayload() instanceof Station)).isTrue();
        producerBinding.unbind();
        consumerBinding.unbind();
    }

    @Test
    public void testSendJsonReceiveJsonWithStreamListener() throws Exception {
        StreamListenerMessageHandler handler = this.buildStreamListener(AbstractBinderTests.class, "echoStationString", String.class);
        B binder = this.getBinder();
        BindingProperties producerBindingProperties = this.createProducerBindingProperties(this.createProducerProperties());
        DirectChannel moduleOutputChannel = this.createBindableChannel("output", producerBindingProperties);
        BindingProperties consumerBindingProperties = this.createConsumerBindingProperties(this.createConsumerProperties());
        DirectChannel moduleInputChannel = this.createBindableChannel("input", consumerBindingProperties);
        Binding producerBinding = binder.bindProducer(String.format("bad%s0e", this.getDestinationNameDelimiter()), (Object)moduleOutputChannel, producerBindingProperties.getProducer());
        Binding consumerBinding = binder.bindConsumer(String.format("bad%s0e", this.getDestinationNameDelimiter()), "test-5", (Object)moduleInputChannel, consumerBindingProperties.getConsumer());
        String value = "{\"readings\":[{\"stationid\":\"fgh\",\"customerid\":\"12345\",\"timestamp\":null},{\"stationid\":\"hjk\",\"customerid\":\"222\",\"timestamp\":null}]}";
        Message message = MessageBuilder.withPayload((Object)value).setHeader("contentType", (Object)MimeTypeUtils.APPLICATION_JSON).build();
        moduleInputChannel.subscribe((MessageHandler)handler);
        moduleOutputChannel.send(message);
        QueueChannel channel = (QueueChannel)handler.getOutputChannel();
        Message reply = channel.receive(5000L);
        Assertions.assertThat((Object)reply).isNotNull();
        Assertions.assertThat((boolean)(reply.getPayload() instanceof String)).isTrue();
        producerBinding.unbind();
        consumerBinding.unbind();
    }

    @Test
    public void testSendPojoReceivePojoWithStreamListener() throws Exception {
        StreamListenerMessageHandler handler = this.buildStreamListener(AbstractBinderTests.class, "echoStation", Station.class);
        B binder = this.getBinder();
        BindingProperties producerBindingProperties = this.createProducerBindingProperties(this.createProducerProperties());
        DirectChannel moduleOutputChannel = this.createBindableChannel("output", producerBindingProperties);
        BindingProperties consumerBindingProperties = this.createConsumerBindingProperties(this.createConsumerProperties());
        DirectChannel moduleInputChannel = this.createBindableChannel("input", consumerBindingProperties);
        Binding producerBinding = binder.bindProducer(String.format("bad%s0f", this.getDestinationNameDelimiter()), (Object)moduleOutputChannel, producerBindingProperties.getProducer());
        Binding consumerBinding = binder.bindConsumer(String.format("bad%s0f", this.getDestinationNameDelimiter()), "test-6", (Object)moduleInputChannel, consumerBindingProperties.getConsumer());
        Station.Readings r1 = new Station.Readings();
        r1.setCustomerid("123");
        r1.setStationid("XYZ");
        Station.Readings r2 = new Station.Readings();
        r2.setCustomerid("546");
        r2.setStationid("ABC");
        Station station = new Station();
        station.setReadings(Arrays.asList(r1, r2));
        Message message = MessageBuilder.withPayload((Object)station).setHeader("contentType", (Object)MimeTypeUtils.APPLICATION_JSON).build();
        moduleInputChannel.subscribe((MessageHandler)handler);
        moduleOutputChannel.send(message);
        QueueChannel channel = (QueueChannel)handler.getOutputChannel();
        Message reply = channel.receive(5000L);
        Assertions.assertThat((Object)reply).isNotNull();
        Assertions.assertThat((boolean)(reply.getPayload() instanceof Station)).isTrue();
        producerBinding.unbind();
        consumerBinding.unbind();
    }

    private Station echoStation(Station station) {
        return station;
    }

    private String echoStationString(String station) {
        return station;
    }

    private StreamListenerMessageHandler buildStreamListener(Class<?> handlerClass, String handlerMethodName, Class<?> ... parameters) throws Exception {
        String channelName = "reply_" + System.nanoTime();
        GenericApplicationContext context = new GenericApplicationContext();
        context.getBeanFactory().registerSingleton(channelName, (Object)new QueueChannel());
        Method m = ReflectionUtils.findMethod(handlerClass, (String)handlerMethodName, (Class[])parameters);
        InvocableHandlerMethod method = new InvocableHandlerMethod((Object)this, m);
        HandlerMethodArgumentResolverComposite resolver = new HandlerMethodArgumentResolverComposite();
        CompositeMessageConverterFactory factory = new CompositeMessageConverterFactory();
        resolver.addResolver((HandlerMethodArgumentResolver)new PayloadArgumentResolver((MessageConverter)factory.getMessageConverterForAllRegistered()));
        method.setMessageMethodArgumentResolvers(resolver);
        Constructor c = ReflectionUtils.accessibleConstructor(StreamListenerMessageHandler.class, (Class[])new Class[]{InvocableHandlerMethod.class, Boolean.TYPE, String[].class});
        StreamListenerMessageHandler handler = (StreamListenerMessageHandler)c.newInstance(method, false, new String[0]);
        handler.setOutputChannelName(channelName);
        handler.setBeanFactory((BeanFactory)context);
        handler.afterPropertiesSet();
        context.refresh();
        return handler;
    }

    private class Foo {
        private String name;

        private Foo() {
        }

        public String getName() {
            return this.name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }

    public static class Station {
        List<Readings> readings = new ArrayList<Readings>();

        public List<Readings> getReadings() {
            return this.readings;
        }

        public void setReadings(List<Readings> readings) {
            this.readings = readings;
        }

        public static class Readings
        implements Serializable {
            public String stationid;
            public String customerid;
            public String timestamp;

            public String getStationid() {
                return this.stationid;
            }

            public void setStationid(String stationid) {
                this.stationid = stationid;
            }

            public String getCustomerid() {
                return this.customerid;
            }

            public void setCustomerid(String customerid) {
                this.customerid = customerid;
            }

            public String getTimestamp() {
                return this.timestamp;
            }

            public void setTimestamp(String timestamp) {
                this.timestamp = timestamp;
            }
        }
    }
}

