/*
 * Decompiled with CFR 0.152.
 */
package com.espertech.esper.dataflow.ops;

import com.espertech.esper.client.EPException;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.EventType;
import com.espertech.esper.client.dataflow.EPDataFlowEventCollector;
import com.espertech.esper.client.dataflow.EPDataFlowEventCollectorContext;
import com.espertech.esper.client.dataflow.EventBusCollector;
import com.espertech.esper.core.service.EPRuntimeEventSender;
import com.espertech.esper.dataflow.annotations.DataFlowOpParameter;
import com.espertech.esper.dataflow.annotations.DataFlowOperator;
import com.espertech.esper.dataflow.interfaces.DataFlowOpCloseContext;
import com.espertech.esper.dataflow.interfaces.DataFlowOpInitializateContext;
import com.espertech.esper.dataflow.interfaces.DataFlowOpInitializeResult;
import com.espertech.esper.dataflow.interfaces.DataFlowOpLifecycle;
import com.espertech.esper.dataflow.interfaces.DataFlowOpOpenContext;
import com.espertech.esper.event.EventAdapterService;
import com.espertech.esper.event.EventBeanAdapterFactory;
import java.util.Map;
import org.w3c.dom.Node;

@DataFlowOperator
public class EventBusSink
implements DataFlowOpLifecycle {
    private EventAdapterService eventAdapterService;
    private EPRuntimeEventSender runtimeEventSender;
    @DataFlowOpParameter
    private EPDataFlowEventCollector collector;
    private EventBusCollector eventBusCollector;
    private EventBeanAdapterFactory[] adapterFactories;
    private ThreadLocal<EPDataFlowEventCollectorContext> collectorDataTL = new ThreadLocal<EPDataFlowEventCollectorContext>(){

        @Override
        protected synchronized EPDataFlowEventCollectorContext initialValue() {
            return null;
        }
    };

    @Override
    public DataFlowOpInitializeResult initialize(DataFlowOpInitializateContext context) throws Exception {
        int i;
        if (!context.getOutputPorts().isEmpty()) {
            throw new IllegalArgumentException("EventBusSink operator does not provide an output stream");
        }
        EventType[] eventTypes = new EventType[context.getInputPorts().size()];
        for (i = 0; i < eventTypes.length; ++i) {
            eventTypes[i] = context.getInputPorts().get(i).getTypeDesc().getEventType();
        }
        this.runtimeEventSender = context.getRuntimeEventSender();
        this.eventAdapterService = context.getStatementContext().getEventAdapterService();
        if (this.collector != null) {
            this.eventBusCollector = new EventBusCollector(){

                @Override
                public void sendEvent(Object object) throws EPException {
                    EventBean event = EventBusSink.this.eventAdapterService.adapterForBean(object);
                    EventBusSink.this.runtimeEventSender.processWrappedEvent(event);
                }

                @Override
                public void sendEvent(Map map, String eventTypeName) throws EPException {
                    EventBean event = EventBusSink.this.eventAdapterService.adapterForMap(map, eventTypeName);
                    EventBusSink.this.runtimeEventSender.processWrappedEvent(event);
                }

                @Override
                public void sendEvent(Object[] objectArray, String eventTypeName) throws EPException {
                    EventBean event = EventBusSink.this.eventAdapterService.adapterForObjectArray(objectArray, eventTypeName);
                    EventBusSink.this.runtimeEventSender.processWrappedEvent(event);
                }

                @Override
                public void sendEvent(Node node) throws EPException {
                    EventBean event = EventBusSink.this.eventAdapterService.adapterForDOM(node);
                    EventBusSink.this.runtimeEventSender.processWrappedEvent(event);
                }
            };
        } else {
            this.adapterFactories = new EventBeanAdapterFactory[eventTypes.length];
            for (i = 0; i < eventTypes.length; ++i) {
                this.adapterFactories[i] = context.getServicesContext().getEventAdapterService().getAdapterFactoryForType(eventTypes[i]);
            }
        }
        return null;
    }

    public void onInput(int port, Object data) {
        if (this.eventBusCollector != null) {
            EPDataFlowEventCollectorContext holder = this.collectorDataTL.get();
            if (holder == null) {
                holder = new EPDataFlowEventCollectorContext(this.eventBusCollector, data);
                this.collectorDataTL.set(holder);
            } else {
                holder.setEvent(data);
            }
            this.collector.collect(holder);
        } else if (data instanceof EventBean) {
            this.runtimeEventSender.processWrappedEvent((EventBean)data);
        } else {
            EventBean event = this.adapterFactories[port].makeAdapter(data);
            this.runtimeEventSender.processWrappedEvent(event);
        }
    }

    @Override
    public void open(DataFlowOpOpenContext openContext) {
    }

    @Override
    public void close(DataFlowOpCloseContext openContext) {
    }
}

