/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.co;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class IntervalJoinOperator<K, T1, T2, OUT>
extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>>
implements TwoInputStreamOperator<T1, T2, OUT>,
Triggerable<K, String> {
    private static final long serialVersionUID = -5380774605111543454L;
    private static final Logger logger = LoggerFactory.getLogger(IntervalJoinOperator.class);
    private static final String LEFT_BUFFER = "LEFT_BUFFER";
    private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
    private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
    private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
    private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";
    private final long lowerBound;
    private final long upperBound;
    private final OutputTag<T1> leftLateDataOutputTag;
    private final OutputTag<T2> rightLateDataOutputTag;
    private final TypeSerializer<T1> leftTypeSerializer;
    private final TypeSerializer<T2> rightTypeSerializer;
    private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
    private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
    private transient TimestampedCollector<OUT> collector;
    private transient ContextImpl context;
    private transient InternalTimerService<String> internalTimerService;

    public IntervalJoinOperator(long lowerBound, long upperBound, boolean lowerBoundInclusive, boolean upperBoundInclusive, OutputTag<T1> leftLateDataOutputTag, OutputTag<T2> rightLateDataOutputTag, TypeSerializer<T1> leftTypeSerializer, TypeSerializer<T2> rightTypeSerializer, ProcessJoinFunction<T1, T2, OUT> udf) {
        super(Preconditions.checkNotNull(udf));
        Preconditions.checkArgument(lowerBound <= upperBound, "lowerBound <= upperBound must be fulfilled");
        this.lowerBound = lowerBoundInclusive ? lowerBound : lowerBound + 1L;
        this.upperBound = upperBoundInclusive ? upperBound : upperBound - 1L;
        this.leftLateDataOutputTag = leftLateDataOutputTag;
        this.rightLateDataOutputTag = rightLateDataOutputTag;
        this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer);
        this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer);
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector(this.output);
        this.context = new ContextImpl((ProcessJoinFunction)this.userFunction);
        this.internalTimerService = this.getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<Long, T1>(LEFT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<T1>(new BufferEntrySerializer<T1>(this.leftTypeSerializer))));
        this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<Long, T2>(RIGHT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<T2>(new BufferEntrySerializer<T2>(this.rightTypeSerializer))));
    }

    @Override
    public void processElement1(StreamRecord<T1> record) throws Exception {
        this.processElement(record, this.leftBuffer, this.rightBuffer, this.lowerBound, this.upperBound, true);
    }

    @Override
    public void processElement2(StreamRecord<T2> record) throws Exception {
        this.processElement(record, this.rightBuffer, this.leftBuffer, -this.upperBound, -this.lowerBound, false);
    }

    private <THIS, OTHER> void processElement(StreamRecord<THIS> record, MapState<Long, List<BufferEntry<THIS>>> ourBuffer, MapState<Long, List<BufferEntry<OTHER>>> otherBuffer, long relativeLowerBound, long relativeUpperBound, boolean isLeft) throws Exception {
        long cleanupTime;
        THIS ourValue = record.getValue();
        long ourTimestamp = record.getTimestamp();
        if (ourTimestamp == Long.MIN_VALUE) {
            throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in interval stream joins need to have timestamps meaningful timestamps.");
        }
        if (this.isLate(ourTimestamp)) {
            this.sideOutput(ourValue, ourTimestamp, isLeft);
            return;
        }
        IntervalJoinOperator.addToBuffer(ourBuffer, ourValue, ourTimestamp);
        for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket : otherBuffer.entries()) {
            long timestamp = bucket.getKey();
            if (timestamp < ourTimestamp + relativeLowerBound || timestamp > ourTimestamp + relativeUpperBound) continue;
            for (BufferEntry<OTHER> entry : bucket.getValue()) {
                if (isLeft) {
                    this.collect(ourValue, entry.element, ourTimestamp, timestamp);
                    continue;
                }
                this.collect(entry.element, ourValue, timestamp, ourTimestamp);
            }
        }
        long l = cleanupTime = relativeUpperBound > 0L ? ourTimestamp + relativeUpperBound : ourTimestamp;
        if (isLeft) {
            this.internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
        } else {
            this.internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
        }
    }

    private boolean isLate(long timestamp) {
        long currentWatermark = this.internalTimerService.currentWatermark();
        return timestamp < currentWatermark;
    }

    protected <T> void sideOutput(T value, long timestamp, boolean isLeft) {
        if (isLeft) {
            if (this.leftLateDataOutputTag != null) {
                this.output.collect(this.leftLateDataOutputTag, new StreamRecord<T>(value, timestamp));
            }
        } else if (this.rightLateDataOutputTag != null) {
            this.output.collect(this.rightLateDataOutputTag, new StreamRecord<T>(value, timestamp));
        }
    }

    private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
        long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
        this.collector.setAbsoluteTimestamp(resultTimestamp);
        this.context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);
        ((ProcessJoinFunction)this.userFunction).processElement(left, right, this.context, this.collector);
    }

    private static <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> buffer, T value, long timestamp) throws Exception {
        List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
        if (elemsInBucket == null) {
            elemsInBucket = new ArrayList<BufferEntry<T>>();
        }
        elemsInBucket.add(new BufferEntry<T>(value, false));
        buffer.put(timestamp, elemsInBucket);
    }

    @Override
    public void onEventTime(InternalTimer<K, String> timer) throws Exception {
        long timerTimestamp = timer.getTimestamp();
        String namespace = timer.getNamespace();
        logger.trace("onEventTime @ {}", (Object)timerTimestamp);
        switch (namespace) {
            case "CLEANUP_LEFT": {
                long timestamp = this.upperBound <= 0L ? timerTimestamp : timerTimestamp - this.upperBound;
                logger.trace("Removing from left buffer @ {}", (Object)timestamp);
                this.leftBuffer.remove(timestamp);
                break;
            }
            case "CLEANUP_RIGHT": {
                long timestamp = this.lowerBound <= 0L ? timerTimestamp + this.lowerBound : timerTimestamp;
                logger.trace("Removing from right buffer @ {}", (Object)timestamp);
                this.rightBuffer.remove(timestamp);
                break;
            }
            default: {
                throw new RuntimeException("Invalid namespace " + namespace);
            }
        }
    }

    @Override
    public void onProcessingTime(InternalTimer<K, String> timer) throws Exception {
    }

    @VisibleForTesting
    MapState<Long, List<BufferEntry<T1>>> getLeftBuffer() {
        return this.leftBuffer;
    }

    @VisibleForTesting
    MapState<Long, List<BufferEntry<T2>>> getRightBuffer() {
        return this.rightBuffer;
    }

    public static final class BufferEntrySerializerSnapshot<T>
    extends CompositeTypeSerializerSnapshot<BufferEntry<T>, BufferEntrySerializer<T>> {
        private static final int VERSION = 2;

        public BufferEntrySerializerSnapshot() {
        }

        BufferEntrySerializerSnapshot(BufferEntrySerializer<T> serializerInstance) {
            super(serializerInstance);
        }

        @Override
        protected int getCurrentOuterSnapshotVersion() {
            return 2;
        }

        @Override
        protected TypeSerializer<?>[] getNestedSerializers(BufferEntrySerializer<T> outerSerializer) {
            return new TypeSerializer[]{outerSerializer.elementSerializer};
        }

        @Override
        protected BufferEntrySerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
            return new BufferEntrySerializer(nestedSerializers[0]);
        }
    }

    @Internal
    @VisibleForTesting
    public static class BufferEntrySerializer<T>
    extends TypeSerializer<BufferEntry<T>> {
        private static final long serialVersionUID = -20197698803836236L;
        private final TypeSerializer<T> elementSerializer;

        public BufferEntrySerializer(TypeSerializer<T> elementSerializer) {
            this.elementSerializer = Preconditions.checkNotNull(elementSerializer);
        }

        @Override
        public boolean isImmutableType() {
            return true;
        }

        @Override
        public TypeSerializer<BufferEntry<T>> duplicate() {
            return new BufferEntrySerializer<T>(this.elementSerializer.duplicate());
        }

        @Override
        public BufferEntry<T> createInstance() {
            return null;
        }

        @Override
        public BufferEntry<T> copy(BufferEntry<T> from) {
            return new BufferEntry(from.element, from.hasBeenJoined);
        }

        @Override
        public BufferEntry<T> copy(BufferEntry<T> from, BufferEntry<T> reuse) {
            return this.copy(from);
        }

        @Override
        public int getLength() {
            return -1;
        }

        @Override
        public void serialize(BufferEntry<T> record, DataOutputView target) throws IOException {
            target.writeBoolean(record.hasBeenJoined);
            this.elementSerializer.serialize(record.element, target);
        }

        @Override
        public BufferEntry<T> deserialize(DataInputView source) throws IOException {
            boolean hasBeenJoined = source.readBoolean();
            T element = this.elementSerializer.deserialize(source);
            return new BufferEntry<T>(element, hasBeenJoined);
        }

        @Override
        public BufferEntry<T> deserialize(BufferEntry<T> reuse, DataInputView source) throws IOException {
            return this.deserialize(source);
        }

        @Override
        public void copy(DataInputView source, DataOutputView target) throws IOException {
            target.writeBoolean(source.readBoolean());
            this.elementSerializer.copy(source, target);
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            BufferEntrySerializer that = (BufferEntrySerializer)o;
            return Objects.equals(this.elementSerializer, that.elementSerializer);
        }

        @Override
        public int hashCode() {
            return Objects.hash(this.elementSerializer);
        }

        @Override
        public TypeSerializerSnapshot<BufferEntry<T>> snapshotConfiguration() {
            return new BufferEntrySerializerSnapshot(this);
        }
    }

    @Internal
    @VisibleForTesting
    public static class BufferEntry<T> {
        private final T element;
        private final boolean hasBeenJoined;

        public BufferEntry(T element, boolean hasBeenJoined) {
            this.element = element;
            this.hasBeenJoined = hasBeenJoined;
        }

        public T getElement() {
            return this.element;
        }

        @VisibleForTesting
        public boolean hasBeenJoined() {
            return this.hasBeenJoined;
        }
    }

    private final class ContextImpl
    extends ProcessJoinFunction.Context {
        private long resultTimestamp = Long.MIN_VALUE;
        private long leftTimestamp = Long.MIN_VALUE;
        private long rightTimestamp = Long.MIN_VALUE;

        private ContextImpl(ProcessJoinFunction<T1, T2, OUT> func) {
        }

        private void updateTimestamps(long left, long right, long result) {
            this.leftTimestamp = left;
            this.rightTimestamp = right;
            this.resultTimestamp = result;
        }

        @Override
        public long getLeftTimestamp() {
            return this.leftTimestamp;
        }

        @Override
        public long getRightTimestamp() {
            return this.rightTimestamp;
        }

        @Override
        public long getTimestamp() {
            return this.resultTimestamp;
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            Preconditions.checkArgument(outputTag != null, "OutputTag must not be null");
            IntervalJoinOperator.this.output.collect(outputTag, new StreamRecord<X>(value, this.getTimestamp()));
        }
    }
}

