/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.FutureCollector;
import org.apache.beam.runners.samza.runtime.OpEmitter;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncDoFnRunner<InT, OutT>
implements DoFnRunner<InT, OutT> {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncDoFnRunner.class);
    private final DoFnRunner<InT, OutT> underlying;
    private final ExecutorService executor;
    private final OpEmitter<OutT> emitter;
    private final FutureCollector<OutT> futureCollector;

    public static <InT, OutT> AsyncDoFnRunner<InT, OutT> create(DoFnRunner<InT, OutT> runner, OpEmitter<OutT> emitter, FutureCollector<OutT> futureCollector, SamzaPipelineOptions options) {
        LOG.info("Run DoFn with " + AsyncDoFnRunner.class.getName());
        return new AsyncDoFnRunner<InT, OutT>(runner, emitter, futureCollector, options);
    }

    private AsyncDoFnRunner(DoFnRunner<InT, OutT> runner, OpEmitter<OutT> emitter, FutureCollector<OutT> futureCollector, SamzaPipelineOptions options) {
        this.underlying = runner;
        this.executor = options.getExecutorServiceForProcessElement();
        this.emitter = emitter;
        this.futureCollector = futureCollector;
    }

    public void startBundle() {
        this.underlying.startBundle();
    }

    public void processElement(WindowedValue<InT> elem) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> this.underlying.processElement(elem), this.executor);
        CompletionStage outputFutures = future.thenApply(x -> this.emitter.collectOutput().stream().map(OpMessage::getElement).collect(Collectors.toList()));
        this.futureCollector.addAll(outputFutures);
    }

    public <KeyT> void onTimer(String timerId, String timerFamilyId, KeyT key, BoundedWindow window, Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) {
        this.underlying.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain);
    }

    public void finishBundle() {
        this.underlying.finishBundle();
    }

    public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
        this.underlying.onWindowExpiration(window, timestamp, key);
    }

    public DoFn<InT, OutT> getFn() {
        return this.underlying.getFn();
    }
}

