/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing.operators;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.MapStateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
import org.apache.flink.runtime.asyncprocessing.operators.TimestampedCollectorWithDeclaredVariable;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class AsyncIntervalJoinOperator<K, T1, T2, OUT>
extends AbstractAsyncStateUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>>
implements TwoInputStreamOperator<T1, T2, OUT>,
Triggerable<K, String> {
    private static final long serialVersionUID = -5380774605111543477L;
    private static final Logger logger = LoggerFactory.getLogger(AsyncIntervalJoinOperator.class);
    private static final String LEFT_BUFFER = "LEFT_BUFFER";
    private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
    private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
    private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
    private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";
    private final long lowerBound;
    private final long upperBound;
    private final OutputTag<T1> leftLateDataOutputTag;
    private final OutputTag<T2> rightLateDataOutputTag;
    private final TypeSerializer<T1> leftTypeSerializer;
    private final TypeSerializer<T2> rightTypeSerializer;
    private transient MapState<Long, List<IntervalJoinOperator.BufferEntry<T1>>> leftBuffer;
    private transient MapState<Long, List<IntervalJoinOperator.BufferEntry<T2>>> rightBuffer;
    private transient DeclaredVariable<Long> resultTimestamp;
    private transient DeclaredVariable<Long> leftTimestamp;
    private transient DeclaredVariable<Long> rightTimestamp;
    private transient TimestampedCollectorWithDeclaredVariable<OUT> collector;
    private transient ContextImpl context;
    private transient InternalTimerService<String> internalTimerService;

    public AsyncIntervalJoinOperator(long lowerBound, long upperBound, boolean lowerBoundInclusive, boolean upperBoundInclusive, OutputTag<T1> leftLateDataOutputTag, OutputTag<T2> rightLateDataOutputTag, TypeSerializer<T1> leftTypeSerializer, TypeSerializer<T2> rightTypeSerializer, ProcessJoinFunction<T1, T2, OUT> udf) {
        super(Preconditions.checkNotNull(udf));
        Preconditions.checkArgument(lowerBound <= upperBound, "lowerBound <= upperBound must be fulfilled");
        this.lowerBound = lowerBoundInclusive ? lowerBound : lowerBound + 1L;
        this.upperBound = upperBoundInclusive ? upperBound : upperBound - 1L;
        this.leftLateDataOutputTag = leftLateDataOutputTag;
        this.rightLateDataOutputTag = rightLateDataOutputTag;
        this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer);
        this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer);
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.leftBuffer = this.getRuntimeContext().getMapState(new MapStateDescriptor<Long, T1>(LEFT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<T1>(new IntervalJoinOperator.BufferEntrySerializer<T1>(this.leftTypeSerializer))));
        this.rightBuffer = this.getRuntimeContext().getMapState(new MapStateDescriptor<Long, T2>(RIGHT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<T2>(new IntervalJoinOperator.BufferEntrySerializer<T2>(this.rightTypeSerializer))));
        this.resultTimestamp = this.declarationContext.declareVariable(LongSerializer.INSTANCE, "_AsyncIntervalJoinOperator$resultTime", () -> Long.MIN_VALUE);
        this.leftTimestamp = this.declarationContext.declareVariable(LongSerializer.INSTANCE, "_AsyncIntervalJoinOperator$leftTime", () -> Long.MIN_VALUE);
        this.rightTimestamp = this.declarationContext.declareVariable(LongSerializer.INSTANCE, "_AsyncIntervalJoinOperator$rightTime", () -> Long.MIN_VALUE);
        this.collector = new TimestampedCollectorWithDeclaredVariable(this.output, this.resultTimestamp);
        this.context = new ContextImpl((ProcessJoinFunction)this.userFunction, this.resultTimestamp, this.leftTimestamp, this.rightTimestamp);
        this.internalTimerService = this.getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);
    }

    @Override
    public void processElement1(StreamRecord<T1> record) throws Exception {
        this.processElement(record, this.leftBuffer, this.rightBuffer, this.lowerBound, this.upperBound, true);
    }

    @Override
    public void processElement2(StreamRecord<T2> record) throws Exception {
        this.processElement(record, this.rightBuffer, this.leftBuffer, -this.upperBound, -this.lowerBound, false);
    }

    private <THIS, OTHER> void processElement(StreamRecord<THIS> record, MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer, MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer, long relativeLowerBound, long relativeUpperBound, boolean isLeft) throws Exception {
        Object ourValue = record.getValue();
        long ourTimestamp = record.getTimestamp();
        if (ourTimestamp == Long.MIN_VALUE) {
            throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in interval stream joins need to have timestamps meaningful timestamps.");
        }
        if (this.isLate(ourTimestamp)) {
            this.sideOutput(ourValue, ourTimestamp, isLeft);
            return;
        }
        AsyncIntervalJoinOperator.addToBuffer(ourBuffer, ourValue, ourTimestamp).thenCompose(empty -> otherBuffer.asyncEntries()).thenCompose(entries -> entries.onNext(bucket -> {
            long timestamp = (Long)bucket.getKey();
            if (timestamp < ourTimestamp + relativeLowerBound || timestamp > ourTimestamp + relativeUpperBound) {
                return;
            }
            for (IntervalJoinOperator.BufferEntry entry : (List)bucket.getValue()) {
                if (isLeft) {
                    this.collect(ourValue, entry.getElement(), ourTimestamp, timestamp);
                    continue;
                }
                this.collect(entry.getElement(), ourValue, timestamp, ourTimestamp);
            }
        })).thenAccept(empty -> {
            long cleanupTime;
            long l = cleanupTime = relativeUpperBound > 0L ? ourTimestamp + relativeUpperBound : ourTimestamp;
            if (isLeft) {
                this.internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
            } else {
                this.internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
            }
        });
    }

    private boolean isLate(long timestamp) {
        long currentWatermark = this.internalTimerService.currentWatermark();
        return timestamp < currentWatermark;
    }

    protected <T> void sideOutput(T value, long timestamp, boolean isLeft) {
        if (isLeft) {
            if (this.leftLateDataOutputTag != null) {
                this.output.collect(this.leftLateDataOutputTag, new StreamRecord<T>(value, timestamp));
            }
        } else if (this.rightLateDataOutputTag != null) {
            this.output.collect(this.rightLateDataOutputTag, new StreamRecord<T>(value, timestamp));
        }
    }

    private void collect(T1 left, T2 right, long leftTime, long rightTime) throws Exception {
        this.resultTimestamp.set(Math.max(leftTime, rightTime));
        this.leftTimestamp.set(leftTime);
        this.rightTimestamp.set(rightTime);
        ((ProcessJoinFunction)this.userFunction).processElement(left, right, this.context, this.collector);
    }

    private static <T> StateFuture<Void> addToBuffer(MapState<Long, List<IntervalJoinOperator.BufferEntry<T>>> buffer, T value, long timestamp) {
        return buffer.asyncGet(timestamp).thenCompose(elemsInBucket -> {
            if (elemsInBucket == null) {
                elemsInBucket = new ArrayList<IntervalJoinOperator.BufferEntry<Object>>();
            }
            elemsInBucket.add(new IntervalJoinOperator.BufferEntry<Object>(value, false));
            return buffer.asyncPut(timestamp, (List)elemsInBucket);
        });
    }

    @Override
    public void onEventTime(InternalTimer<K, String> timer) throws Exception {
        long timerTimestamp = timer.getTimestamp();
        String namespace = timer.getNamespace();
        logger.trace("onEventTime @ {}", (Object)timerTimestamp);
        switch (namespace) {
            case "CLEANUP_LEFT": {
                long timestamp = this.upperBound <= 0L ? timerTimestamp : timerTimestamp - this.upperBound;
                logger.trace("Removing from left buffer @ {}", (Object)timestamp);
                this.leftBuffer.remove(timestamp);
                break;
            }
            case "CLEANUP_RIGHT": {
                long timestamp = this.lowerBound <= 0L ? timerTimestamp + this.lowerBound : timerTimestamp;
                logger.trace("Removing from right buffer @ {}", (Object)timestamp);
                this.rightBuffer.remove(timestamp);
                break;
            }
            default: {
                throw new RuntimeException("Invalid namespace " + namespace);
            }
        }
    }

    @Override
    public void onProcessingTime(InternalTimer<K, String> timer) throws Exception {
    }

    @VisibleForTesting
    MapState<Long, List<IntervalJoinOperator.BufferEntry<T1>>> getLeftBuffer() {
        return this.leftBuffer;
    }

    @VisibleForTesting
    MapState<Long, List<IntervalJoinOperator.BufferEntry<T2>>> getRightBuffer() {
        return this.rightBuffer;
    }

    private final class ContextImpl
    extends ProcessJoinFunction.Context {
        private final DeclaredVariable<Long> resultTimestamp;
        private final DeclaredVariable<Long> leftTimestamp;
        private final DeclaredVariable<Long> rightTimestamp;

        private ContextImpl(ProcessJoinFunction<T1, T2, OUT> func, DeclaredVariable<Long> resultTimestamp, DeclaredVariable<Long> leftTimestamp, DeclaredVariable<Long> rightTimestamp) {
            super(func);
            this.resultTimestamp = resultTimestamp;
            this.leftTimestamp = leftTimestamp;
            this.rightTimestamp = rightTimestamp;
        }

        @Override
        public long getLeftTimestamp() {
            return (Long)this.leftTimestamp.get();
        }

        @Override
        public long getRightTimestamp() {
            return (Long)this.rightTimestamp.get();
        }

        @Override
        public long getTimestamp() {
            return (Long)this.resultTimestamp.get();
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            Preconditions.checkArgument(outputTag != null, "OutputTag must not be null");
            AsyncIntervalJoinOperator.this.output.collect(outputTag, new StreamRecord<X>(value, this.getTimestamp()));
        }
    }
}

