/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.windowing;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowContext;
import org.apache.pulsar.functions.api.WindowFunction;
import org.apache.pulsar.functions.runtime.shaded.com.google.gson.Gson;
import org.apache.pulsar.functions.runtime.shaded.net.jodah.typetools.TypeResolver;
import org.apache.pulsar.functions.windowing.Event;
import org.apache.pulsar.functions.windowing.EvictionPolicy;
import org.apache.pulsar.functions.windowing.TimestampExtractor;
import org.apache.pulsar.functions.windowing.TriggerPolicy;
import org.apache.pulsar.functions.windowing.WaterMarkEventGenerator;
import org.apache.pulsar.functions.windowing.Window;
import org.apache.pulsar.functions.windowing.WindowContextImpl;
import org.apache.pulsar.functions.windowing.WindowImpl;
import org.apache.pulsar.functions.windowing.WindowLifecycleListener;
import org.apache.pulsar.functions.windowing.WindowManager;
import org.apache.pulsar.functions.windowing.evictors.CountEvictionPolicy;
import org.apache.pulsar.functions.windowing.evictors.TimeEvictionPolicy;
import org.apache.pulsar.functions.windowing.evictors.WatermarkCountEvictionPolicy;
import org.apache.pulsar.functions.windowing.evictors.WatermarkTimeEvictionPolicy;
import org.apache.pulsar.functions.windowing.triggers.CountTriggerPolicy;
import org.apache.pulsar.functions.windowing.triggers.TimeTriggerPolicy;
import org.apache.pulsar.functions.windowing.triggers.WatermarkCountTriggerPolicy;
import org.apache.pulsar.functions.windowing.triggers.WatermarkTimeTriggerPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WindowFunctionExecutor<T, X>
implements org.apache.pulsar.functions.api.Function<T, X> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(WindowFunctionExecutor.class);
    private boolean initialized;
    protected WindowConfig windowConfig;
    private WindowManager<Record<T>> windowManager;
    private TimestampExtractor<T> timestampExtractor;
    protected transient WaterMarkEventGenerator<Record<T>> waterMarkEventGenerator;
    protected Function<Collection<T>, X> bareWindowFunction;
    protected WindowFunction<T, X> windowFunction;

    @Override
    public void initialize(Context context) {
        this.windowConfig = this.getWindowConfigs(context);
        this.initializeUserFunction(this.windowConfig);
        log.info("Window Config: {}", (Object)this.windowConfig);
        this.windowManager = this.getWindowManager(this.windowConfig, context);
        this.initialized = true;
        this.start();
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void initializeUserFunction(WindowConfig windowConfig) {
        ClassLoader clsLoader;
        String actualWindowFunctionClassName = windowConfig.getActualWindowFunctionClassName();
        Object userClassObject = Reflections.createInstance(actualWindowFunctionClassName, clsLoader = Thread.currentThread().getContextClassLoader());
        if (userClassObject instanceof Function) {
            Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, userClassObject.getClass());
            if (!typeArgs[0].equals(Collection.class)) throw new IllegalArgumentException("Window function must take a collection as input");
            this.bareWindowFunction = (Function)userClassObject;
            return;
        } else {
            if (!(userClassObject instanceof WindowFunction)) throw new IllegalArgumentException("Window function does not implement the correct interface");
            this.windowFunction = (WindowFunction)userClassObject;
        }
    }

    private WindowConfig getWindowConfigs(Context context) {
        if (!context.getUserConfigValue("__WINDOWCONFIGS__").isPresent()) {
            throw new IllegalArgumentException("Window Configs cannot be found");
        }
        WindowConfig windowConfig = new Gson().fromJson(new Gson().toJson(context.getUserConfigValue("__WINDOWCONFIGS__").get()), WindowConfig.class);
        return windowConfig;
    }

    private WindowManager<Record<T>> getWindowManager(WindowConfig windowConfig, Context context) {
        WindowLifecycleListener lifecycleListener = this.newWindowLifecycleListener(context);
        WindowManager<Record<Record<T>>> manager = new WindowManager<Record<Record<T>>>(lifecycleListener, new ConcurrentLinkedQueue());
        if (this.windowConfig.getTimestampExtractorClassName() != null) {
            this.timestampExtractor = this.getTimeStampExtractor(windowConfig);
            this.waterMarkEventGenerator = new WaterMarkEventGenerator<Record<Record<T>>>(manager, this.windowConfig.getWatermarkEmitIntervalMs(), this.windowConfig.getMaxLagMs(), new HashSet<String>(context.getInputTopics()), context);
        } else if (this.windowConfig.getLateDataTopic() != null) {
            throw new IllegalArgumentException("Late data topic can be defined only when specifying a timestamp extractor class");
        }
        EvictionPolicy<Record<T>, ?> evictionPolicy = this.getEvictionPolicy(windowConfig);
        TriggerPolicy<Record<T>, ?> triggerPolicy = this.getTriggerPolicy(windowConfig, manager, evictionPolicy, context);
        manager.setEvictionPolicy(evictionPolicy);
        manager.setTriggerPolicy(triggerPolicy);
        return manager;
    }

    private TimestampExtractor<T> getTimeStampExtractor(WindowConfig windowConfig) {
        Object result;
        Class<?> theCls;
        try {
            theCls = Class.forName(windowConfig.getTimestampExtractorClassName(), true, Thread.currentThread().getContextClassLoader());
        }
        catch (ClassNotFoundException | NoClassDefFoundError cnfe) {
            throw new RuntimeException(String.format("Timestamp extractor class %s must be in class path", windowConfig.getTimestampExtractorClassName()), cnfe);
        }
        try {
            Constructor<?> constructor = theCls.getDeclaredConstructor(new Class[0]);
            constructor.setAccessible(true);
            result = constructor.newInstance(new Object[0]);
        }
        catch (InstantiationException ie) {
            throw new RuntimeException("User class must be concrete", ie);
        }
        catch (NoSuchMethodException e) {
            throw new RuntimeException("User class doesn't have such method", e);
        }
        catch (IllegalAccessException e) {
            throw new RuntimeException("User class must have a no-arg constructor", e);
        }
        catch (InvocationTargetException e) {
            throw new RuntimeException("User class constructor throws exception", e);
        }
        Class<?>[] timestampExtractorTypeArgs = TypeResolver.resolveRawArguments(TimestampExtractor.class, result.getClass());
        Class<?>[] typeArgs = TypeResolver.resolveRawArguments(org.apache.pulsar.functions.api.Function.class, this.getClass());
        if (!typeArgs[0].equals(timestampExtractorTypeArgs[0])) {
            throw new RuntimeException("Inconsistent types found between function input type and timestamp extractor type:  function type = " + String.valueOf(typeArgs[0]) + ", timestamp extractor type = " + String.valueOf(timestampExtractorTypeArgs[0]));
        }
        return (TimestampExtractor)result;
    }

    private TriggerPolicy<Record<T>, ?> getTriggerPolicy(WindowConfig windowConfig, WindowManager<Record<T>> manager, EvictionPolicy<Record<T>, ?> evictionPolicy, Context context) {
        if (windowConfig.getSlidingIntervalCount() != null) {
            if (this.isEventTime()) {
                return new WatermarkCountTriggerPolicy<Record<T>>(windowConfig.getSlidingIntervalCount(), manager, evictionPolicy, manager);
            }
            return new CountTriggerPolicy<Record<T>>(windowConfig.getSlidingIntervalCount(), manager, evictionPolicy);
        }
        if (this.isEventTime()) {
            return new WatermarkTimeTriggerPolicy<Record<T>>(windowConfig.getSlidingIntervalDurationMs(), manager, evictionPolicy, manager);
        }
        return new TimeTriggerPolicy<Record<T>>(windowConfig.getSlidingIntervalDurationMs(), manager, evictionPolicy, context);
    }

    private EvictionPolicy<Record<T>, ?> getEvictionPolicy(WindowConfig windowConfig) {
        if (windowConfig.getWindowLengthCount() != null) {
            if (this.isEventTime()) {
                return new WatermarkCountEvictionPolicy<Record<T>>(windowConfig.getWindowLengthCount());
            }
            return new CountEvictionPolicy<Record<T>>(windowConfig.getWindowLengthCount());
        }
        if (this.isEventTime()) {
            return new WatermarkTimeEvictionPolicy<Record<T>>(windowConfig.getWindowLengthDurationMs(), windowConfig.getMaxLagMs());
        }
        return new TimeEvictionPolicy<Record<T>>(windowConfig.getWindowLengthDurationMs());
    }

    protected WindowLifecycleListener<Event<Record<T>>> newWindowLifecycleListener(final Context context) {
        return new WindowLifecycleListener<Event<Record<T>>>(){

            @Override
            public void onExpiry(List<Event<Record<T>>> events) {
            }

            @Override
            public void onActivation(List<Event<Record<T>>> tuples, List<Event<Record<T>>> newTuples, List<Event<Record<T>>> expiredTuples, Long referenceTime) {
                WindowFunctionExecutor.this.processWindow(context, tuples.stream().map(event -> (Record)event.get()).collect(Collectors.toList()), newTuples.stream().map(event -> (Record)event.get()).collect(Collectors.toList()), expiredTuples.stream().map(event -> (Record)event.get()).collect(Collectors.toList()), referenceTime);
            }
        };
    }

    private void processWindow(Context context, List<Record<T>> tuples, List<Record<T>> newTuples, List<Record<T>> expiredTuples, Long referenceTime) {
        Object output = null;
        try {
            output = this.process(new WindowImpl<Record<T>>(tuples, newTuples, expiredTuples, this.getWindowStartTs(referenceTime), referenceTime), new WindowContextImpl(context));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        if (output != null) {
            context.publish(context.getOutputTopic(), output, context.getOutputSchemaType()).thenAccept(__ -> {
                if (this.windowConfig.getProcessingGuarantees() == WindowConfig.ProcessingGuarantees.ATLEAST_ONCE) {
                    for (Record record : tuples) {
                        record.ack();
                    }
                }
            });
        } else if (this.windowConfig.getProcessingGuarantees() == WindowConfig.ProcessingGuarantees.ATLEAST_ONCE) {
            for (Record<T> record : tuples) {
                record.ack();
            }
        }
    }

    private Long getWindowStartTs(Long endTs) {
        Long res = null;
        if (endTs != null && this.windowConfig.getWindowLengthDurationMs() != null) {
            res = endTs - this.windowConfig.getWindowLengthDurationMs();
        }
        return res;
    }

    private void start() {
        if (this.waterMarkEventGenerator != null) {
            log.debug("Starting waterMarkEventGenerator");
            this.waterMarkEventGenerator.start();
        }
        log.debug("Starting trigger policy");
        this.windowManager.triggerPolicy.start();
    }

    public void shutdown() {
        if (this.waterMarkEventGenerator != null) {
            this.waterMarkEventGenerator.shutdown();
        }
        if (this.windowManager != null) {
            this.windowManager.shutdown();
        }
    }

    private boolean isEventTime() {
        return this.timestampExtractor != null;
    }

    @Override
    public X process(T input, Context context) throws Exception {
        if (!this.initialized) {
            this.initialize(context);
        }
        Record<?> record = context.getCurrentRecord();
        if (this.windowConfig.getProcessingGuarantees() == WindowConfig.ProcessingGuarantees.ATMOST_ONCE) {
            record.ack();
        }
        if (this.isEventTime()) {
            long ts = this.timestampExtractor.extractTimestamp(record.getValue());
            if (this.waterMarkEventGenerator.track(record.getTopicName().get(), ts)) {
                this.windowManager.add(record, ts, record);
            } else if (this.windowConfig.getLateDataTopic() != null) {
                context.newOutputMessage(this.windowConfig.getLateDataTopic(), null).value(input).sendAsync();
            } else {
                log.info(String.format("Received a late tuple %s with ts %d. This will not be processed.", input, ts));
            }
        } else {
            this.windowManager.add(record, System.currentTimeMillis(), record);
        }
        return null;
    }

    public X process(Window<Record<T>> inputWindow, WindowContext context) throws Exception {
        if (this.bareWindowFunction != null) {
            Collection newCollection = inputWindow.get().stream().map(Record::getValue).collect(Collectors.toList());
            return this.bareWindowFunction.apply(newCollection);
        }
        return this.windowFunction.process(inputWindow.get(), context);
    }
}

