/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.source.polling.watermark.selector;

import java.io.Serializable;
import java.util.Iterator;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.config.ConfigurationException;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.context.MuleContextAware;
import org.mule.runtime.core.api.message.InternalMessage;
import org.mule.runtime.core.api.store.ObjectStoreException;
import org.mule.runtime.core.config.i18n.CoreMessages;
import org.mule.runtime.core.internal.streaming.object.ProvidesTotalHint;
import org.mule.runtime.core.source.polling.watermark.Watermark;
import org.mule.runtime.core.source.polling.watermark.WatermarkPollingInterceptor;
import org.mule.runtime.core.source.polling.watermark.selector.WatermarkSelector;
import org.mule.runtime.core.source.polling.watermark.selector.WatermarkSelectorWrapper;

public class SelectorWatermarkPollingInterceptor
extends WatermarkPollingInterceptor
implements MuleContextAware {
    private final WatermarkSelector selector;
    private final String selectorExpression;
    private MuleContext muleContext;

    public SelectorWatermarkPollingInterceptor(Watermark watermark, WatermarkSelector selector, String selectorExpression) {
        super(watermark);
        this.selector = selector;
        this.selectorExpression = selectorExpression;
    }

    @Override
    public Event prepareRouting(Event sourceEvent, Event event, FlowConstruct flow) throws ConfigurationException {
        event = super.prepareRouting(sourceEvent, event, flow);
        Object payload = event.getMessage().getPayload().getValue();
        WatermarkSelectorWrapper selector = new WatermarkSelectorWrapper(this.selector, this.selectorExpression, event, this.muleContext);
        if (payload instanceof Iterable) {
            for (Object object : (Iterable)payload) {
                ((WatermarkSelector)selector).acceptValue(object);
            }
        } else if (payload instanceof Iterator) {
            event = Event.builder(event).message(InternalMessage.builder(event.getMessage()).payload(new SelectorIteratorProxy((Iterator)payload, selector)).build()).build();
        } else {
            throw new ConfigurationException(CoreMessages.createStaticMessage((String)String.format("Poll executing with payload of class %s but selector can only handle Iterator and Iterable objects when watermark is to be updated via selectors", payload.getClass().getCanonicalName())));
        }
        return event;
    }

    @Override
    public void postProcessRouting(Event event) throws ObjectStoreException {
        this.watermark.updateWith(event, (Serializable)this.selector.getSelectedValue());
    }

    @Override
    public void setMuleContext(MuleContext context) {
        this.muleContext = context;
    }

    private static class SelectorIteratorProxy<T>
    implements Iterator<T>,
    ProvidesTotalHint {
        private final Iterator<T> delegate;
        private final WatermarkSelector selector;

        private SelectorIteratorProxy(Iterator<T> delegate, WatermarkSelector selector) {
            this.delegate = delegate;
            this.selector = selector;
        }

        @Override
        public boolean hasNext() {
            return this.delegate.hasNext();
        }

        @Override
        public T next() {
            T next = this.delegate.next();
            this.selector.acceptValue(next);
            return next;
        }

        @Override
        public void remove() {
            this.delegate.remove();
        }

        @Override
        public int size() {
            return this.delegate instanceof ProvidesTotalHint ? ((ProvidesTotalHint)((Object)this.delegate)).size() : -1;
        }
    }
}

