/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.windowing.sessionwindows;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.test.windowing.sessionwindows.EventGenerator;
import org.apache.flink.test.windowing.sessionwindows.GeneratorConfiguration;
import org.apache.flink.test.windowing.sessionwindows.GeneratorEventFactory;
import org.apache.flink.test.windowing.sessionwindows.LongRandomGenerator;
import org.apache.flink.test.windowing.sessionwindows.SessionConfiguration;
import org.apache.flink.test.windowing.sessionwindows.SessionGeneratorConfiguration;
import org.apache.flink.util.Preconditions;

public class SessionEventGeneratorImpl<K, E>
implements EventGenerator<K, E> {
    private final LongRandomGenerator randomGenerator;
    private final SessionGeneratorConfiguration<K, E> configuration;
    private final List<Long> orderedTimelyTimestamps;
    private final long minTimestamp;
    private final long maxTimestamp;
    private int producedEventsCount;
    private EventGenerator<K, E> timingAwareEventGenerator;

    public SessionEventGeneratorImpl(SessionGeneratorConfiguration<K, E> configuration, LongRandomGenerator randomGenerator) {
        Preconditions.checkNotNull(configuration);
        Preconditions.checkNotNull((Object)randomGenerator);
        this.producedEventsCount = 0;
        this.configuration = configuration;
        this.randomGenerator = randomGenerator;
        int timelyEventsInSessionCount = configuration.getSessionConfiguration().getNumberOfTimelyEvents();
        this.orderedTimelyTimestamps = new ArrayList<Long>(timelyEventsInSessionCount);
        this.minTimestamp = configuration.getSessionConfiguration().getMinEventTimestamp();
        this.generateOrderedTimelyTimestamps(this.minTimestamp, timelyEventsInSessionCount);
        this.maxTimestamp = this.orderedTimelyTimestamps.get(this.orderedTimelyTimestamps.size() - 1);
        this.timingAwareEventGenerator = new TimelyGenerator();
    }

    @Override
    public boolean canGenerateEventAtWatermark(long globalWatermark) {
        return this.timingAwareEventGenerator.canGenerateEventAtWatermark(globalWatermark);
    }

    @Override
    public E generateEvent(long globalWatermark) {
        if (this.hasMoreEvents()) {
            ++this.producedEventsCount;
            E event = this.timingAwareEventGenerator.generateEvent(globalWatermark);
            while (!this.timingAwareEventGenerator.hasMoreEvents()) {
                this.timingAwareEventGenerator = this.timingAwareEventGenerator.getNextGenerator(globalWatermark);
            }
            return event;
        }
        throw new IllegalStateException("All events exhausted");
    }

    @Override
    public long getLocalWatermark() {
        return this.timingAwareEventGenerator.getLocalWatermark();
    }

    @Override
    public boolean hasMoreEvents() {
        return this.producedEventsCount < this.getAllEventsCount();
    }

    private void generateOrderedTimelyTimestamps(long minTimestamp, int onTimeEventCountInSession) {
        long generatedTimestamp = minTimestamp;
        for (int i = 1; i < onTimeEventCountInSession; ++i) {
            this.orderedTimelyTimestamps.add(generatedTimestamp);
            generatedTimestamp += this.randomGenerator.randomLongBetween(0L, this.getGap() - 1L);
        }
        this.orderedTimelyTimestamps.add(generatedTimestamp);
    }

    private E createEventFromTimestamp(long eventTimestamp, long globalWatermark, Timing timing) {
        return this.getEventFactory().createEvent(this.getKey(), this.getSessionId(), this.producedEventsCount, eventTimestamp, globalWatermark, timing);
    }

    private long generateTimelyInSessionTimestamp() {
        int chosenTimestampIndex = this.randomGenerator.choseRandomIndex(this.orderedTimelyTimestamps);
        return this.orderedTimelyTimestamps.remove(chosenTimestampIndex);
    }

    private long generateArbitraryInSessionTimestamp() {
        return this.randomGenerator.randomLongBetween(this.minTimestamp, this.maxTimestamp + 1L);
    }

    private boolean isTriggered(long globalWatermark) {
        return globalWatermark >= this.maxTimestamp + this.getGap() - 1L;
    }

    private boolean isAfterLateness(long globalWatermark) {
        return globalWatermark >= this.getAfterLatenessTimestamp();
    }

    private long getAfterLatenessTimestamp() {
        return this.getTriggerTimestamp() + this.getLateness();
    }

    private long getTriggerTimestamp() {
        return this.maxTimestamp + this.getGap() - 1L;
    }

    @Override
    public K getKey() {
        return this.configuration.getSessionConfiguration().getKey();
    }

    private long getGap() {
        return this.configuration.getSessionConfiguration().getGap();
    }

    private long getLateness() {
        return this.configuration.getGeneratorConfiguration().getAllowedLateness();
    }

    private GeneratorEventFactory<K, E> getEventFactory() {
        return this.configuration.getSessionConfiguration().getEventFactory();
    }

    private int getSessionId() {
        return this.configuration.getSessionConfiguration().getSessionId();
    }

    private int getTimelyEventsCount() {
        return this.configuration.getSessionConfiguration().getNumberOfTimelyEvents();
    }

    private int getLateEventsCount() {
        return this.getTimelyEventsCount() + this.configuration.getGeneratorConfiguration().getLateEventsWithinLateness();
    }

    private int getAllEventsCount() {
        return this.getLateEventsCount() + this.configuration.getGeneratorConfiguration().getLateEventsAfterLateness();
    }

    private boolean hasMoreTimelyEvents() {
        return !this.orderedTimelyTimestamps.isEmpty();
    }

    private boolean hasMoreInLatenessEvents() {
        return this.producedEventsCount < this.getLateEventsCount();
    }

    @Override
    public EventGenerator<K, E> getNextGenerator(long globalWatermark) {
        GeneratorConfiguration generatorConfiguration = this.configuration.getGeneratorConfiguration();
        SessionConfiguration<K, E> sessionConfiguration = this.configuration.getSessionConfiguration();
        long maxAdditionalGap = generatorConfiguration.getMaxAdditionalSessionGap();
        long nextStartTime = Math.max(this.getAfterLatenessTimestamp() + this.randomGenerator.randomLongBetween(0L, maxAdditionalGap), globalWatermark);
        sessionConfiguration = sessionConfiguration.getFollowupSessionConfiguration(nextStartTime);
        SessionGeneratorConfiguration<K, E> sessionGeneratorConfiguration = new SessionGeneratorConfiguration<K, E>(sessionConfiguration, generatorConfiguration);
        return new SessionEventGeneratorImpl<K, E>(sessionGeneratorConfiguration, this.randomGenerator);
    }

    private class AfterLatenessGenerator
    extends AbstractEventGenerator {
        private AfterLatenessGenerator() {
        }

        @Override
        public E generateEvent(long globalWatermark) {
            return SessionEventGeneratorImpl.this.createEventFromTimestamp(SessionEventGeneratorImpl.this.generateArbitraryInSessionTimestamp(), globalWatermark, Timing.AFTER_LATENESS);
        }

        @Override
        public long getLocalWatermark() {
            return SessionEventGeneratorImpl.this.getAfterLatenessTimestamp();
        }

        @Override
        public boolean canGenerateEventAtWatermark(long globalWatermark) {
            return SessionEventGeneratorImpl.this.isAfterLateness(globalWatermark);
        }

        @Override
        public boolean hasMoreEvents() {
            return true;
        }

        @Override
        public EventGenerator<K, E> getNextGenerator(long globalWatermark) {
            throw new IllegalStateException("This generator has no successor");
        }
    }

    private class InLatenessGenerator
    extends AbstractEventGenerator {
        private InLatenessGenerator() {
        }

        @Override
        public E generateEvent(long globalWatermark) {
            return SessionEventGeneratorImpl.this.createEventFromTimestamp(SessionEventGeneratorImpl.this.generateArbitraryInSessionTimestamp(), globalWatermark, Timing.IN_LATENESS);
        }

        @Override
        public long getLocalWatermark() {
            return SessionEventGeneratorImpl.this.getAfterLatenessTimestamp() - 1L;
        }

        @Override
        public boolean canGenerateEventAtWatermark(long globalWatermark) {
            return SessionEventGeneratorImpl.this.isTriggered(globalWatermark);
        }

        @Override
        public boolean hasMoreEvents() {
            return SessionEventGeneratorImpl.this.hasMoreInLatenessEvents();
        }

        @Override
        public EventGenerator<K, E> getNextGenerator(long globalWatermark) {
            return new AfterLatenessGenerator();
        }
    }

    private class TimelyGenerator
    extends AbstractEventGenerator {
        private TimelyGenerator() {
        }

        @Override
        public E generateEvent(long globalWatermark) {
            return SessionEventGeneratorImpl.this.createEventFromTimestamp(SessionEventGeneratorImpl.this.generateTimelyInSessionTimestamp(), globalWatermark, Timing.TIMELY);
        }

        @Override
        public long getLocalWatermark() {
            return (Long)SessionEventGeneratorImpl.this.orderedTimelyTimestamps.get(0);
        }

        @Override
        public boolean canGenerateEventAtWatermark(long globalWatermark) {
            return true;
        }

        @Override
        public boolean hasMoreEvents() {
            return SessionEventGeneratorImpl.this.hasMoreTimelyEvents();
        }

        @Override
        public EventGenerator<K, E> getNextGenerator(long globalWatermark) {
            return new InLatenessGenerator();
        }
    }

    private abstract class AbstractEventGenerator
    implements EventGenerator<K, E> {
        private AbstractEventGenerator() {
        }

        @Override
        public K getKey() {
            return SessionEventGeneratorImpl.this.configuration.getSessionConfiguration().getKey();
        }
    }

    public static enum Timing {
        TIMELY,
        IN_LATENESS,
        AFTER_LATENESS;

    }
}

