/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.runners.samza.SamzaPipelineExceptionContext;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.runners.samza.runtime.Op;
import org.apache.beam.runners.samza.runtime.OpEmitter;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.runners.samza.translation.TranslationContext;
import org.apache.beam.runners.samza.util.FutureUtils;
import org.apache.beam.runners.samza.util.SamzaPipelineExceptionListener;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
import org.apache.samza.operators.functions.AsyncFlatMapFunction;
import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpAdapter<@UnknownKeyFor InT, @UnknownKeyFor OutT, @UnknownKeyFor K>
implements AsyncFlatMapFunction<OpMessage<InT>, OpMessage<OutT>>,
WatermarkFunction<OpMessage<OutT>>,
ScheduledFunction<KeyedTimerData<K>, OpMessage<OutT>>,
Serializable {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(OpAdapter.class);
    private final @UnknownKeyFor @NonNull @Initialized Op<InT, OutT, K> op;
    private final @UnknownKeyFor @NonNull @Initialized String transformFullName;
    private final transient @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions samzaPipelineOptions;
    private transient @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter;
    private transient @UnknownKeyFor @NonNull @Initialized Config config;
    private transient @UnknownKeyFor @NonNull @Initialized Context context;
    private transient @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized SamzaPipelineExceptionListener.Registrar> exceptionListeners;

    public static <InT, OutT, K> @UnknownKeyFor @NonNull @Initialized AsyncFlatMapFunction<@UnknownKeyFor @NonNull @Initialized OpMessage<InT>, @UnknownKeyFor @NonNull @Initialized OpMessage<OutT>> adapt(@UnknownKeyFor @NonNull @Initialized Op<InT, OutT, K> op, @UnknownKeyFor @NonNull @Initialized TranslationContext ctx) {
        return new OpAdapter<InT, OutT, K>(op, ctx.getTransformFullName(), ctx.getPipelineOptions());
    }

    private OpAdapter(@UnknownKeyFor @NonNull @Initialized Op<InT, OutT, K> op, @UnknownKeyFor @NonNull @Initialized String transformFullName, @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions samzaPipelineOptions) {
        this.op = op;
        this.transformFullName = transformFullName;
        this.samzaPipelineOptions = samzaPipelineOptions;
    }

    public final void init(@UnknownKeyFor @NonNull @Initialized Context context) {
        this.emitter = new OpEmitterImpl();
        this.config = context.getJobContext().getConfig();
        this.context = context;
        this.exceptionListeners = StreamSupport.stream(ServiceLoader.load(SamzaPipelineExceptionListener.Registrar.class).spliterator(), false).collect(Collectors.toList());
    }

    public final void schedule(@UnknownKeyFor @NonNull @Initialized Scheduler<@UnknownKeyFor @NonNull @Initialized KeyedTimerData<K>> timerRegistry) {
        assert (this.context != null);
        this.op.open(this.config, this.context, timerRegistry, this.emitter);
    }

    public synchronized @UnknownKeyFor @NonNull @Initialized CompletionStage<@UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized OpMessage<OutT>>> apply(@UnknownKeyFor @NonNull @Initialized OpMessage<InT> message) {
        try {
            switch (message.getType()) {
                case ELEMENT: {
                    this.op.processElement(message.getElement(), this.emitter);
                    break;
                }
                case SIDE_INPUT: {
                    this.op.processSideInput(message.getViewId(), message.getViewElements(), this.emitter);
                    break;
                }
                case SIDE_INPUT_WATERMARK: {
                    this.op.processSideInputWatermark(message.getSideInputWatermark(), this.emitter);
                    break;
                }
                default: {
                    throw new IllegalArgumentException(String.format("Unexpected input type: %s", new Object[]{message.getType()}));
                }
            }
        }
        catch (Exception e) {
            LOG.error("Exception happened in transform: {}", (Object)this.transformFullName, (Object)e);
            this.notifyExceptionListeners(this.transformFullName, e, this.samzaPipelineOptions);
            throw UserCodeException.wrap((Throwable)e);
        }
        CompletableFuture resultFuture = CompletableFuture.completedFuture(this.emitter.collectOutput());
        return FutureUtils.combineFutures(resultFuture, this.emitter.collectFuture());
    }

    public synchronized @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized OpMessage<OutT>> processWatermark(@UnknownKeyFor @NonNull @Initialized long time) {
        try {
            this.op.processWatermark(new Instant(time), this.emitter);
        }
        catch (Exception e) {
            LOG.error("Op {} threw an exception during processing watermark", (Object)this.getClass().getName(), (Object)e);
            throw UserCodeException.wrap((Throwable)e);
        }
        return this.emitter.collectOutput();
    }

    public synchronized @UnknownKeyFor @NonNull @Initialized Long getOutputWatermark() {
        return this.emitter.collectWatermark();
    }

    public synchronized @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized OpMessage<OutT>> onCallback(@UnknownKeyFor @NonNull @Initialized KeyedTimerData<K> keyedTimerData, @UnknownKeyFor @NonNull @Initialized long time) {
        try {
            this.op.processTimer(keyedTimerData, this.emitter);
        }
        catch (Exception e) {
            LOG.error("Op {} threw an exception during processing timer", (Object)this.getClass().getName(), (Object)e);
            throw UserCodeException.wrap((Throwable)e);
        }
        return this.emitter.collectOutput();
    }

    public void close() {
        this.op.close();
    }

    private void notifyExceptionListeners(@UnknownKeyFor @NonNull @Initialized String transformFullName, @UnknownKeyFor @NonNull @Initialized Exception e, @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions samzaPipelineOptions) {
        try {
            this.exceptionListeners.forEach(listener -> listener.getExceptionListener(samzaPipelineOptions).onException(new SamzaPipelineExceptionContext(transformFullName, e)));
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    static class OpEmitterImpl<@UnknownKeyFor OutT>
    implements OpEmitter<OutT> {
        private final @UnknownKeyFor @NonNull @Initialized Queue<@UnknownKeyFor @NonNull @Initialized OpMessage<OutT>> outputQueue = new ConcurrentLinkedQueue<OpMessage<OutT>>();
        private @UnknownKeyFor @NonNull @Initialized CompletionStage<@UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized OpMessage<OutT>>> outputFuture;
        private @UnknownKeyFor @NonNull @Initialized Instant outputWatermark;

        OpEmitterImpl() {
        }

        @Override
        public void emitElement(@UnknownKeyFor @NonNull @Initialized WindowedValue<OutT> element) {
            this.outputQueue.add(OpMessage.ofElement(element));
        }

        @Override
        public void emitFuture(@UnknownKeyFor @NonNull @Initialized CompletionStage<@UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized WindowedValue<OutT>>> resultFuture) {
            CompletionStage resultFutureWrapped = resultFuture.thenApply(res -> res.stream().map(OpMessage::ofElement).collect(Collectors.toList()));
            this.outputFuture = FutureUtils.combineFutures(this.outputFuture, resultFutureWrapped);
        }

        @Override
        public void emitWatermark(@UnknownKeyFor @NonNull @Initialized Instant watermark) {
            this.outputWatermark = watermark;
        }

        @Override
        public <T> void emitView(@UnknownKeyFor @NonNull @Initialized String id, @UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized Iterable<T>> elements) {
            this.outputQueue.add(OpMessage.ofSideInput(id, elements));
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized OpMessage<OutT>> collectOutput() {
            OpMessage<OutT> output;
            ArrayList<OpMessage<OutT>> outputList = new ArrayList<OpMessage<OutT>>();
            while ((output = this.outputQueue.poll()) != null) {
                outputList.add(output);
            }
            return outputList;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized CompletionStage<@UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized OpMessage<OutT>>> collectFuture() {
            CompletionStage<Collection<OpMessage<OutT>>> future = this.outputFuture;
            this.outputFuture = null;
            return future;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized Long collectWatermark() {
            Instant watermark = this.outputWatermark;
            this.outputWatermark = null;
            return watermark == null ? null : Long.valueOf(watermark.getMillis());
        }
    }
}

