/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.FutureCollector;
import org.apache.beam.runners.samza.runtime.OpEmitter;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncDoFnRunner<@UnknownKeyFor InT, @UnknownKeyFor OutT>
implements DoFnRunner<InT, OutT> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(AsyncDoFnRunner.class);
    private static final @UnknownKeyFor @NonNull @Initialized Object NULL_KEY = new Object();
    private final @UnknownKeyFor @NonNull @Initialized DoFnRunner<InT, OutT> underlying;
    private final @UnknownKeyFor @NonNull @Initialized ExecutorService executor;
    private final @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter;
    private final @UnknownKeyFor @NonNull @Initialized FutureCollector<OutT> futureCollector;
    private final @UnknownKeyFor @NonNull @Initialized boolean isStateful;
    private final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Object, @UnknownKeyFor @NonNull @Initialized CompletableFuture<@UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized WindowedValue<OutT>>>> keyedOutputFutures;

    public static <InT, OutT> @UnknownKeyFor @NonNull @Initialized AsyncDoFnRunner<InT, OutT> create(@UnknownKeyFor @NonNull @Initialized DoFnRunner<InT, OutT> runner, @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter, @UnknownKeyFor @NonNull @Initialized FutureCollector<OutT> futureCollector, @UnknownKeyFor @NonNull @Initialized boolean isStateful, @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions options) {
        LOG.info("Run DoFn with " + AsyncDoFnRunner.class.getName());
        return new AsyncDoFnRunner<InT, OutT>(runner, emitter, futureCollector, isStateful, options);
    }

    private AsyncDoFnRunner(@UnknownKeyFor @NonNull @Initialized DoFnRunner<InT, OutT> runner, @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter, @UnknownKeyFor @NonNull @Initialized FutureCollector<OutT> futureCollector, @UnknownKeyFor @NonNull @Initialized boolean isStateful, @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions options) {
        this.underlying = runner;
        this.executor = options.getExecutorServiceForProcessElement();
        this.emitter = emitter;
        this.futureCollector = futureCollector;
        this.isStateful = isStateful;
        this.keyedOutputFutures = new ConcurrentHashMap<Object, CompletableFuture<Collection<WindowedValue<OutT>>>>();
    }

    public void startBundle() {
        this.underlying.startBundle();
    }

    public void processElement(@UnknownKeyFor @NonNull @Initialized WindowedValue<InT> elem) {
        CompletableFuture<Collection<WindowedValue<OutT>>> outputFutures = this.isStateful ? this.processStateful(elem) : this.processElement(elem, null);
        this.futureCollector.addAll(outputFutures);
    }

    private @UnknownKeyFor @NonNull @Initialized CompletableFuture<@UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized WindowedValue<OutT>>> processElement(@UnknownKeyFor @NonNull @Initialized WindowedValue<InT> elem, @Nullable @UnknownKeyFor @Initialized CompletableFuture<@UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized WindowedValue<OutT>>> prevOutputFuture) {
        CompletableFuture<Collection<WindowedValue<OutT>>> prevFuture = prevOutputFuture == null ? CompletableFuture.completedFuture(Collections.emptyList()) : prevOutputFuture;
        return prevFuture.thenApplyAsync(x -> {
            this.underlying.processElement(elem);
            return this.emitter.collectOutput().stream().map(OpMessage::getElement).collect(Collectors.toList());
        }, (Executor)this.executor);
    }

    private @UnknownKeyFor @NonNull @Initialized CompletableFuture<@UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized WindowedValue<OutT>>> processStateful(@UnknownKeyFor @NonNull @Initialized WindowedValue<InT> elem) {
        Object key = this.getKey(elem);
        CompletableFuture<Collection<WindowedValue<OutT>>> outputFutures = this.processElement(elem, this.keyedOutputFutures.get(key));
        this.keyedOutputFutures.put(key, outputFutures);
        return outputFutures.thenApply(output -> {
            this.keyedOutputFutures.remove(key, outputFutures);
            return output;
        });
    }

    @UnknownKeyFor @NonNull @Initialized boolean hasOutputFuturesForKey(@UnknownKeyFor @NonNull @Initialized Object key) {
        return this.keyedOutputFutures.containsKey(key);
    }

    public <KeyT> void onTimer(@UnknownKeyFor @NonNull @Initialized String timerId, @UnknownKeyFor @NonNull @Initialized String timerFamilyId, KeyT key, @UnknownKeyFor @NonNull @Initialized BoundedWindow window, @UnknownKeyFor @NonNull @Initialized Instant timestamp, @UnknownKeyFor @NonNull @Initialized Instant outputTimestamp, @UnknownKeyFor @NonNull @Initialized TimeDomain timeDomain) {
        this.underlying.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain);
    }

    public void finishBundle() {
        this.underlying.finishBundle();
    }

    public <KeyT> void onWindowExpiration(@UnknownKeyFor @NonNull @Initialized BoundedWindow window, @UnknownKeyFor @NonNull @Initialized Instant timestamp, KeyT key) {
        this.underlying.onWindowExpiration(window, timestamp, key);
    }

    public @UnknownKeyFor @NonNull @Initialized DoFn<InT, OutT> getFn() {
        return this.underlying.getFn();
    }

    private @UnknownKeyFor @NonNull @Initialized Object getKey(@UnknownKeyFor @NonNull @Initialized WindowedValue<InT> elem) {
        KV kv = (KV)elem.getValue();
        if (kv == null) {
            return NULL_KEY;
        }
        Object key = kv.getKey();
        return key == null ? NULL_KEY : key;
    }
}

