/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.metrics;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.runners.samza.metrics.SamzaMetricOpFactory;
import org.apache.beam.runners.samza.metrics.SamzaTransformMetricRegistry;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.runners.samza.runtime.Op;
import org.apache.beam.runners.samza.runtime.OpEmitter;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SamzaGBKMetricOp<@UnknownKeyFor T>
implements Op<T, T, Void> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(SamzaGBKMetricOp.class);
    private final @UnknownKeyFor @NonNull @Initialized String transformFullName;
    private final @UnknownKeyFor @NonNull @Initialized SamzaTransformMetricRegistry samzaTransformMetricRegistry;
    private final @UnknownKeyFor @NonNull @Initialized SamzaMetricOpFactory.OpType opType;
    private final @UnknownKeyFor @NonNull @Initialized String pValue;
    @SuppressFBWarnings(value={"SE_BAD_FIELD"})
    private final @UnknownKeyFor @NonNull @Initialized ConcurrentHashMap<@UnknownKeyFor @NonNull @Initialized BoundedWindow, @UnknownKeyFor @NonNull @Initialized BigInteger> sumOfTimestampsPerWindowId;
    @SuppressFBWarnings(value={"SE_BAD_FIELD"})
    private final @UnknownKeyFor @NonNull @Initialized ConcurrentHashMap<@UnknownKeyFor @NonNull @Initialized BoundedWindow, @UnknownKeyFor @NonNull @Initialized Long> sumOfCountPerWindowId;
    private transient @UnknownKeyFor @NonNull @Initialized String task;

    @Override
    public void open(@UnknownKeyFor @NonNull @Initialized Config config, @UnknownKeyFor @NonNull @Initialized Context context, @UnknownKeyFor @NonNull @Initialized Scheduler<@UnknownKeyFor @NonNull @Initialized KeyedTimerData<@UnknownKeyFor @Nullable @Initialized Void>> timerRegistry, @UnknownKeyFor @NonNull @Initialized OpEmitter<T> emitter) {
        this.task = context.getTaskContext().getTaskModel().getTaskName().getTaskName();
        this.samzaTransformMetricRegistry.register(this.transformFullName, this.pValue, context);
    }

    public SamzaGBKMetricOp(@UnknownKeyFor @NonNull @Initialized String pValue, @UnknownKeyFor @NonNull @Initialized String transformFullName, @UnknownKeyFor @NonNull @Initialized SamzaMetricOpFactory.OpType opType, @UnknownKeyFor @NonNull @Initialized SamzaTransformMetricRegistry samzaTransformMetricRegistry) {
        this.pValue = pValue;
        this.transformFullName = transformFullName;
        this.opType = opType;
        this.samzaTransformMetricRegistry = samzaTransformMetricRegistry;
        this.sumOfTimestampsPerWindowId = new ConcurrentHashMap();
        this.sumOfCountPerWindowId = new ConcurrentHashMap();
    }

    @Override
    public void processElement(@UnknownKeyFor @NonNull @Initialized WindowedValue<T> inputElement, @UnknownKeyFor @NonNull @Initialized OpEmitter<T> emitter) {
        for (BoundedWindow windowId : inputElement.getWindows()) {
            this.sumOfCountPerWindowId.compute(windowId, (key, value) -> {
                value = value == null ? Long.valueOf(0L) : value;
                value = value + 1L;
                return value;
            });
            this.sumOfTimestampsPerWindowId.compute(windowId, (key, value) -> {
                value = value == null ? BigInteger.ZERO : value;
                return value.add(BigInteger.valueOf(System.nanoTime()));
            });
        }
        switch (this.opType) {
            case INPUT: {
                this.samzaTransformMetricRegistry.getTransformMetrics().getTransformInputThroughput(this.transformFullName).inc();
                break;
            }
            case OUTPUT: {
                this.samzaTransformMetricRegistry.getTransformMetrics().getTransformOutputThroughput(this.transformFullName).inc();
            }
        }
        emitter.emitElement(inputElement);
    }

    @Override
    public void processWatermark(@UnknownKeyFor @NonNull @Initialized Instant watermark, @UnknownKeyFor @NonNull @Initialized OpEmitter<T> emitter) {
        ArrayList closedWindows = new ArrayList();
        this.sumOfCountPerWindowId.keySet().stream().filter(windowId -> watermark.isAfter((ReadableInstant)windowId.maxTimestamp())).forEach(windowId -> {
            long sumOfTimestamps = this.sumOfTimestampsPerWindowId.get(windowId) != null ? this.sumOfTimestampsPerWindowId.get(windowId).longValue() : 0L;
            long count = this.sumOfCountPerWindowId.get(windowId);
            closedWindows.add(windowId);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Processing {} Watermark for Transform: {}, WindowId:{}, count: {}, sumOfTimestamps: {}, task: {}", new Object[]{this.opType, this.transformFullName, windowId, count, sumOfTimestamps, this.task});
            }
            if (sumOfTimestamps > 0L && count > 0L) {
                switch (this.opType) {
                    case INPUT: {
                        this.samzaTransformMetricRegistry.updateArrivalTimeMap(this.transformFullName, (BoundedWindow)windowId, Math.floorDiv(sumOfTimestamps, count));
                        break;
                    }
                    case OUTPUT: {
                        this.samzaTransformMetricRegistry.emitLatencyMetric(this.transformFullName, (BoundedWindow)windowId, Math.floorDiv(sumOfTimestamps, count), this.task);
                    }
                }
            }
        });
        ((ConcurrentHashMap.KeySetView)this.sumOfCountPerWindowId.keySet()).removeAll((Collection)closedWindows);
        ((ConcurrentHashMap.KeySetView)this.sumOfTimestampsPerWindowId.keySet()).removeAll((Collection)closedWindows);
        if (this.opType == SamzaMetricOpFactory.OpType.OUTPUT) {
            this.samzaTransformMetricRegistry.getTransformMetrics().getTransformWatermarkProgress(this.transformFullName).set((Object)watermark.getMillis());
        }
        emitter.emitWatermark(watermark);
    }
}

