/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.runners.samza.SamzaPipelineExceptionContext;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.runners.samza.runtime.Op;
import org.apache.beam.runners.samza.runtime.OpEmitter;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.runners.samza.translation.TranslationContext;
import org.apache.beam.runners.samza.util.FutureUtils;
import org.apache.beam.runners.samza.util.SamzaPipelineExceptionListener;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
import org.apache.samza.operators.functions.AsyncFlatMapFunction;
import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpAdapter<InT, OutT, K>
implements AsyncFlatMapFunction<OpMessage<InT>, OpMessage<OutT>>,
WatermarkFunction<OpMessage<OutT>>,
ScheduledFunction<KeyedTimerData<K>, OpMessage<OutT>>,
Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(OpAdapter.class);
    private final Op<InT, OutT, K> op;
    private final String transformFullName;
    private transient OpEmitter<OutT> emitter;
    private transient Config config;
    private transient Context context;
    private transient List<SamzaPipelineExceptionListener.Registrar> exceptionListeners;

    public static <InT, OutT, K> AsyncFlatMapFunction<OpMessage<InT>, OpMessage<OutT>> adapt(Op<InT, OutT, K> op, TranslationContext ctx) {
        return new OpAdapter<InT, OutT, K>(op, ctx.getTransformFullName());
    }

    private OpAdapter(Op<InT, OutT, K> op, String transformFullName) {
        this.op = op;
        this.transformFullName = transformFullName;
    }

    public final void init(Context context) {
        this.emitter = new OpEmitterImpl();
        this.config = context.getJobContext().getConfig();
        this.context = context;
        this.exceptionListeners = StreamSupport.stream(ServiceLoader.load(SamzaPipelineExceptionListener.Registrar.class).spliterator(), false).collect(Collectors.toList());
    }

    public final void schedule(Scheduler<KeyedTimerData<K>> timerRegistry) {
        assert (this.context != null);
        this.op.open(this.config, this.context, timerRegistry, this.emitter);
    }

    public synchronized CompletionStage<Collection<OpMessage<OutT>>> apply(OpMessage<InT> message) {
        try {
            switch (message.getType()) {
                case ELEMENT: {
                    this.op.processElement(message.getElement(), this.emitter);
                    break;
                }
                case SIDE_INPUT: {
                    this.op.processSideInput(message.getViewId(), message.getViewElements(), this.emitter);
                    break;
                }
                case SIDE_INPUT_WATERMARK: {
                    this.op.processSideInputWatermark(message.getSideInputWatermark(), this.emitter);
                    break;
                }
                default: {
                    throw new IllegalArgumentException(String.format("Unexpected input type: %s", new Object[]{message.getType()}));
                }
            }
        }
        catch (Exception e) {
            LOG.error("Exception happened in transform: {}", (Object)this.transformFullName, (Object)e);
            this.notifyExceptionListeners(this.transformFullName, e);
            throw UserCodeException.wrap((Throwable)e);
        }
        CompletableFuture resultFuture = CompletableFuture.completedFuture(this.emitter.collectOutput());
        return FutureUtils.combineFutures(resultFuture, this.emitter.collectFuture());
    }

    public synchronized Collection<OpMessage<OutT>> processWatermark(long time) {
        try {
            this.op.processWatermark(new Instant(time), this.emitter);
        }
        catch (Exception e) {
            LOG.error("Op {} threw an exception during processing watermark", (Object)this.getClass().getName(), (Object)e);
            throw UserCodeException.wrap((Throwable)e);
        }
        return this.emitter.collectOutput();
    }

    public synchronized Long getOutputWatermark() {
        return this.emitter.collectWatermark();
    }

    public synchronized Collection<OpMessage<OutT>> onCallback(KeyedTimerData<K> keyedTimerData, long time) {
        try {
            this.op.processTimer(keyedTimerData, this.emitter);
        }
        catch (Exception e) {
            LOG.error("Op {} threw an exception during processing timer", (Object)this.getClass().getName(), (Object)e);
            throw UserCodeException.wrap((Throwable)e);
        }
        return this.emitter.collectOutput();
    }

    public void close() {
        this.op.close();
    }

    private void notifyExceptionListeners(String transformFullName, Exception e) {
        try {
            this.exceptionListeners.forEach(listener -> listener.getExceptionListener().onException(new SamzaPipelineExceptionContext(transformFullName, e)));
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private static class OpEmitterImpl<OutT>
    implements OpEmitter<OutT> {
        private final Queue<OpMessage<OutT>> outputQueue = new ConcurrentLinkedQueue<OpMessage<OutT>>();
        private CompletionStage<Collection<OpMessage<OutT>>> outputFuture;
        private Instant outputWatermark;

        private OpEmitterImpl() {
        }

        @Override
        public void emitElement(WindowedValue<OutT> element) {
            this.outputQueue.add(OpMessage.ofElement(element));
        }

        @Override
        public void emitFuture(CompletionStage<Collection<WindowedValue<OutT>>> resultFuture) {
            CompletionStage resultFutureWrapped = resultFuture.thenApply(res -> res.stream().map(OpMessage::ofElement).collect(Collectors.toList()));
            this.outputFuture = FutureUtils.combineFutures(this.outputFuture, resultFutureWrapped);
        }

        @Override
        public void emitWatermark(Instant watermark) {
            this.outputWatermark = watermark;
        }

        @Override
        public <T> void emitView(String id, WindowedValue<Iterable<T>> elements) {
            this.outputQueue.add(OpMessage.ofSideInput(id, elements));
        }

        @Override
        public Collection<OpMessage<OutT>> collectOutput() {
            OpMessage<OutT> output;
            ArrayList<OpMessage<OutT>> outputList = new ArrayList<OpMessage<OutT>>();
            while ((output = this.outputQueue.poll()) != null) {
                outputList.add(output);
            }
            return outputList;
        }

        @Override
        public CompletionStage<Collection<OpMessage<OutT>>> collectFuture() {
            CompletionStage<Collection<OpMessage<OutT>>> future = this.outputFuture;
            this.outputFuture = null;
            return future;
        }

        @Override
        public Long collectWatermark() {
            Instant watermark = this.outputWatermark;
            this.outputWatermark = null;
            return watermark == null ? null : Long.valueOf(watermark.getMillis());
        }
    }
}

