/*
 * Decompiled with CFR 0.152.
 */
package com.vmware.transport.bus;

import com.vmware.transport.bus.EventBus;
import com.vmware.transport.bus.MessageResponder;
import com.vmware.transport.bus.model.Message;
import com.vmware.transport.bus.model.MessageObjectHandlerConfig;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import java.util.function.Function;

public class MessageResponderImpl<T>
implements MessageResponder<T> {
    private EventBus bus;
    private MessageObjectHandlerConfig config;
    private Observable<Message> channel;
    private Disposable sub;

    public MessageResponderImpl(MessageObjectHandlerConfig config, EventBus bus) {
        this.config = config;
        this.bus = bus;
    }

    private Consumer<Message> createGenerator(Function<Message, T> supplier) {
        return message -> {
            if (supplier != null) {
                String returnChannel = this.config.getReturnChannel();
                Object response = supplier.apply((Message)message);
                if (message.getId() != null) {
                    this.bus.sendResponseMessageWithId(returnChannel, response, message.getId());
                } else {
                    this.bus.sendResponseMessage(returnChannel, response);
                }
            }
            if (this.config.isSingleResponse()) {
                this.bus.closeChannel(this.config.getSendChannel(), this.getClass().getName());
            }
        };
    }

    @Override
    public Disposable generate(Function<Message, T> generator) {
        this.channel = this.bus.getApi().getRequestChannel(this.config.getSendChannel(), this.getClass().getName());
        this.sub = this.config.isSingleResponse() ? this.channel.take(1L).subscribe(this.createGenerator(generator)) : this.channel.subscribe(this.createGenerator(generator));
        return this.sub;
    }

    @Override
    public void tick(T payload) {
        if (this.sub != null && !this.sub.isDisposed()) {
            this.bus.sendResponseMessage(this.config.getReturnChannel(), payload);
        }
    }

    @Override
    public void error(T payload) {
        if (this.sub != null && !this.sub.isDisposed()) {
            this.bus.sendErrorMessage(this.config.getReturnChannel(), payload);
        }
    }

    @Override
    public void close() {
        if (this.sub != null && !this.sub.isDisposed()) {
            this.sub.dispose();
        }
    }

    @Override
    public boolean isClosed() {
        return this.sub != null && this.sub.isDisposed();
    }
}

