/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.runners.samza.runtime.Op;
import org.apache.beam.runners.samza.runtime.OpEmitter;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpAdapter<InT, OutT, K>
implements FlatMapFunction<OpMessage<InT>, OpMessage<OutT>>,
WatermarkFunction<OpMessage<OutT>>,
ScheduledFunction<KeyedTimerData<K>, OpMessage<OutT>>,
Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(OpAdapter.class);
    private final Op<InT, OutT, K> op;
    private transient List<OpMessage<OutT>> outputList;
    private transient Instant outputWatermark;
    private transient OpEmitter<OutT> emitter;
    private transient Config config;
    private transient Context context;

    public static <InT, OutT, K> FlatMapFunction<OpMessage<InT>, OpMessage<OutT>> adapt(Op<InT, OutT, K> op) {
        return new OpAdapter<InT, OutT, K>(op);
    }

    private OpAdapter(Op<InT, OutT, K> op) {
        this.op = op;
    }

    public final void init(Context context) {
        this.outputList = new ArrayList<OpMessage<OutT>>();
        this.emitter = new OpEmitterImpl();
        this.config = context.getJobContext().getConfig();
        this.context = context;
    }

    public final void schedule(Scheduler<KeyedTimerData<K>> timerRegistry) {
        assert (this.context != null);
        this.op.open(this.config, this.context, timerRegistry, this.emitter);
    }

    public Collection<OpMessage<OutT>> apply(OpMessage<InT> message) {
        assert (this.outputList.isEmpty());
        try {
            switch (message.getType()) {
                case ELEMENT: {
                    this.op.processElement(message.getElement(), this.emitter);
                    break;
                }
                case SIDE_INPUT: {
                    this.op.processSideInput(message.getViewId(), message.getViewElements(), this.emitter);
                    break;
                }
                case SIDE_INPUT_WATERMARK: {
                    this.op.processSideInputWatermark(message.getSideInputWatermark(), this.emitter);
                    break;
                }
                default: {
                    throw new IllegalArgumentException(String.format("Unexpected input type: %s", new Object[]{message.getType()}));
                }
            }
        }
        catch (Exception e) {
            LOG.error("Op {} threw an exception during processing", (Object)this.getClass().getName(), (Object)e);
            throw UserCodeException.wrap((Throwable)e);
        }
        ArrayList<OpMessage<OutT>> results = new ArrayList<OpMessage<OutT>>(this.outputList);
        this.outputList.clear();
        return results;
    }

    public Collection<OpMessage<OutT>> processWatermark(long time) {
        assert (this.outputList.isEmpty());
        try {
            this.op.processWatermark(new Instant(time), this.emitter);
        }
        catch (Exception e) {
            LOG.error("Op {} threw an exception during processing watermark", (Object)this.getClass().getName(), (Object)e);
            throw UserCodeException.wrap((Throwable)e);
        }
        ArrayList<OpMessage<OutT>> results = new ArrayList<OpMessage<OutT>>(this.outputList);
        this.outputList.clear();
        return results;
    }

    public Long getOutputWatermark() {
        return this.outputWatermark != null ? Long.valueOf(this.outputWatermark.getMillis()) : null;
    }

    public Collection<OpMessage<OutT>> onCallback(KeyedTimerData<K> keyedTimerData, long time) {
        assert (this.outputList.isEmpty());
        try {
            this.op.processTimer(keyedTimerData, this.emitter);
        }
        catch (Exception e) {
            LOG.error("Op {} threw an exception during processing timer", (Object)this.getClass().getName(), (Object)e);
            throw UserCodeException.wrap((Throwable)e);
        }
        ArrayList<OpMessage<OutT>> results = new ArrayList<OpMessage<OutT>>(this.outputList);
        this.outputList.clear();
        return results;
    }

    public void close() {
        this.op.close();
    }

    private class OpEmitterImpl
    implements OpEmitter<OutT> {
        private OpEmitterImpl() {
        }

        @Override
        public void emitElement(WindowedValue<OutT> element) {
            OpAdapter.this.outputList.add(OpMessage.ofElement(element));
        }

        @Override
        public void emitWatermark(Instant watermark) {
            OpAdapter.this.outputWatermark = watermark;
        }

        @Override
        public <T> void emitView(String id, WindowedValue<Iterable<T>> elements) {
            OpAdapter.this.outputList.add(OpMessage.ofSideInput(id, elements));
        }
    }
}

