/*
 * Decompiled with CFR 0.152.
 */
package com.aol.cyclops.data.async;

import com.aol.cyclops.data.async.Adapter;
import com.aol.cyclops.data.async.Queue;
import com.aol.cyclops.data.async.Topic;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

public class Signal<T> {
    private final AtomicReference<T> discreteState = new AtomicReference<Object>(null);
    private final Adapter<T> continuous;
    private final Adapter<T> discrete;

    public Signal(Adapter<T> continuous, Adapter<T> discrete) {
        this.continuous = continuous;
        this.discrete = discrete;
    }

    public static <T> Signal<T> queueBackedSignal() {
        return new Signal(new Queue(new LinkedBlockingQueue(), null), new Queue(new LinkedBlockingQueue(), null));
    }

    public static <T> Signal<T> topicBackedSignal() {
        return new Signal(new Topic(), new Topic());
    }

    public void fromStream(Stream<T> stream) {
        stream.forEach(next -> this.set(next));
    }

    public T set(T newValue) {
        this.continuous.offer(newValue);
        this.setDiscreteIfDiff(newValue);
        return newValue;
    }

    private void setDiscreteIfDiff(T newValue) {
        T oldVal = this.discreteState.get();
        while (!this.discreteState.compareAndSet(oldVal, newValue)) {
            oldVal = this.discreteState.get();
        }
        if (!Objects.equals(oldVal, newValue)) {
            this.discrete.offer(newValue);
        }
    }

    public void close() {
        this.continuous.close();
        this.discrete.close();
    }

    public Adapter<T> getContinuous() {
        return this.continuous;
    }

    public Adapter<T> getDiscrete() {
        return this.discrete;
    }
}

