/*
 * Decompiled with CFR 0.152.
 */
package org.apache.edgent.oplet.plumbing;

import java.io.Serializable;
import java.util.LinkedList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.edgent.function.BiConsumer;
import org.apache.edgent.function.BiFunction;
import org.apache.edgent.function.Consumer;
import org.apache.edgent.function.Function;
import org.apache.edgent.function.Functions;
import org.apache.edgent.function.Supplier;
import org.apache.edgent.oplet.OpletContext;
import org.apache.edgent.oplet.core.Pipe;
import org.apache.edgent.window.Partition;
import org.apache.edgent.window.PartitionedState;
import org.apache.edgent.window.Policies;
import org.apache.edgent.window.Window;
import org.apache.edgent.window.Windows;

public class PressureReliever<T, K>
extends Pipe<T, T> {
    private static final long serialVersionUID = 1L;
    private ScheduledExecutorService executor;
    private final Window<T, K, LinkedList<T>> window;

    public PressureReliever(int count, Function<T, K> keyFunction) {
        this.window = Windows.window((BiFunction)Policies.alwaysInsert(), (BiConsumer)Policies.countContentsPolicy((int)count), (Consumer)Policies.evictOldest(), (BiConsumer)new FirstSubmitter(), keyFunction, (Supplier & Serializable)() -> new LinkedList());
        this.window.registerPartitionProcessor((BiConsumer & Serializable)(tuples, k) -> {});
    }

    @Override
    public void initialize(OpletContext<T, T> context) {
        super.initialize(context);
        this.executor = context.getService(ScheduledExecutorService.class);
    }

    public void accept(T tuple) {
        this.window.insert(tuple);
    }

    @Override
    public void close() throws Exception {
    }

    private class FirstSubmitter
    extends PartitionedState<K, AtomicBoolean>
    implements BiConsumer<Partition<T, K, LinkedList<T>>, T> {
        private static final long serialVersionUID = 1L;

        FirstSubmitter() {
            super((Supplier & Serializable)() -> new AtomicBoolean());
        }

        public void accept(Partition<T, K, LinkedList<T>> partition, T tuple) {
            this.submitNextTuple(partition);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void submitNextTuple(Partition<T, K, LinkedList<T>> partition) {
            Object firstTuple;
            Object key = partition.getKey();
            AtomicBoolean latch = (AtomicBoolean)this.getState(key);
            if (!latch.compareAndSet(false, true)) {
                return;
            }
            Partition partition2 = partition;
            synchronized (partition2) {
                LinkedList contents = (LinkedList)partition.getContents();
                if (contents.isEmpty()) {
                    latch.set(false);
                    return;
                }
                firstTuple = contents.removeFirst();
            }
            Runnable submit = Functions.delayedConsume((Consumer)PressureReliever.this.getDestination(), firstTuple);
            submit = Functions.runWithFinal((Runnable)submit, () -> {
                latch.set(false);
                this.submitNextTuple(partition);
            });
            PressureReliever.this.executor.execute(submit);
        }
    }
}

