/*
 * Decompiled with CFR 0.152.
 */
package io.nextop.client.node;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
import io.nextop.Id;
import io.nextop.Message;
import io.nextop.Route;
import io.nextop.client.MessageContext;
import io.nextop.client.MessageControl;
import io.nextop.client.MessageControlChannel;
import io.nextop.client.MessageControlNode;
import io.nextop.client.MessageControlState;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;

public class Head
implements MessageControlNode {
    final MessageContext context;
    final MessageControlState mcs;
    final MessageControlNode downstream;
    final Scheduler callbackScheduler;
    final Scheduler.Worker callbackWorker;
    final Object receiverMutex = new Object();
    final ListMultimap<Route, Subscriber> receivers = ArrayListMultimap.create();
    final List<Subscriber> defaultReceivers = new ArrayList<Subscriber>();

    public static Head create(MessageContext context, MessageControlState mcs, MessageControlNode downstream, Scheduler callbackScheduler) {
        return new Head(context, mcs, downstream, callbackScheduler);
    }

    Head(MessageContext context, MessageControlState mcs, MessageControlNode downstream, Scheduler callbackScheduler) {
        this.context = context;
        this.mcs = mcs;
        this.downstream = downstream;
        this.callbackScheduler = callbackScheduler;
        this.callbackWorker = callbackScheduler.createWorker();
    }

    public void send(final Message message) {
        this.mcs.notifyPending(message.id);
        this.post(new Runnable(){

            @Override
            public void run() {
                Head.this.onMessageControl(MessageControl.send(message));
            }
        });
    }

    public void complete(final Message message) {
        this.mcs.notifyPending(message.id);
        this.post(new Runnable(){

            @Override
            public void run() {
                Head.this.onMessageControl(MessageControl.send(MessageControl.Type.COMPLETE, message));
            }
        });
    }

    public void error(final Message message) {
        this.mcs.notifyPending(message.id);
        this.post(new Runnable(){

            @Override
            public void run() {
                Head.this.onMessageControl(MessageControl.send(MessageControl.Type.ERROR, message));
            }
        });
    }

    public void cancelSend(final Id id) {
        this.post(new Runnable(){

            @Override
            public void run() {
                Head.this.onMessageControl(MessageControl.send(MessageControl.Type.ERROR, Message.outboxRoute(id)));
            }
        });
    }

    public Observable<Message> receive(final Route route) {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<Message>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void call(final Subscriber<? super Message> subscriber) {
                Object object = Head.this.receiverMutex;
                synchronized (object) {
                    boolean s = Head.this.receivers.put((Object)route, subscriber);
                    assert (s);
                }
                Subscription subscription = Subscriptions.create((Action0)new Action0(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void call() {
                        Object object = Head.this.receiverMutex;
                        synchronized (object) {
                            boolean s = Head.this.receivers.remove((Object)route, (Object)subscriber);
                            assert (s);
                        }
                    }
                });
                subscriber.add(subscription);
                assert (!subscription.isUnsubscribed());
            }
        });
    }

    public Observable<Message> defaultReceive() {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<Message>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void call(final Subscriber<? super Message> subscriber) {
                Object object = Head.this.receiverMutex;
                synchronized (object) {
                    boolean s = Head.this.defaultReceivers.add(subscriber);
                    assert (s);
                }
                Subscription subscription = Subscriptions.create((Action0)new Action0(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void call() {
                        Object object = Head.this.receiverMutex;
                        synchronized (object) {
                            boolean s = Head.this.defaultReceivers.remove(subscriber);
                            assert (s);
                        }
                    }
                });
                subscriber.add(subscription);
                assert (!subscription.isUnsubscribed());
            }
        });
    }

    public void init(final @Nullable MessageControlNode.Bundle savedState) {
        this.post(new Runnable(){

            @Override
            public void run() {
                Head.this.init(null, savedState);
            }
        });
    }

    public void start() {
        this.post(new Runnable(){

            @Override
            public void run() {
                Head.this.onActive(true);
            }
        });
    }

    public void stop() {
        this.post(new Runnable(){

            @Override
            public void run() {
                Head.this.onActive(false);
            }
        });
    }

    @Override
    public void init(@Nullable MessageControlChannel upstream, @Nullable MessageControlNode.Bundle savedState) {
        if (null != upstream) {
            throw new IllegalArgumentException();
        }
        this.downstream.init(new MessageControlChannel(){

            @Override
            public MessageControlState getMessageControlState() {
                return Head.this.getMessageControlState();
            }

            @Override
            public void onActive(boolean active) {
            }

            @Override
            public void onMessageControl(final MessageControl mc) {
                switch (mc.type) {
                    case MESSAGE: {
                        Head.this.callbackWorker.schedule(new Action0(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            public void call() {
                                Subscriber firstSubscriber;
                                Object object = Head.this.receiverMutex;
                                synchronized (object) {
                                    firstSubscriber = (Subscriber)Iterables.getFirst((Iterable)Iterables.concat((Iterable)Head.this.receivers.get((Object)mc.message.route), Head.this.defaultReceivers), null);
                                }
                                if (null != firstSubscriber) {
                                    firstSubscriber.onNext((Object)mc.message);
                                }
                            }
                        });
                        break;
                    }
                    case COMPLETE: {
                        Head.this.callbackWorker.schedule(new Action0(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            public void call() {
                                Subscriber firstSubscriber;
                                Object object = Head.this.receiverMutex;
                                synchronized (object) {
                                    firstSubscriber = (Subscriber)Iterables.getFirst((Iterable)Head.this.receivers.get((Object)mc.message.route), null);
                                }
                                if (null != firstSubscriber) {
                                    firstSubscriber.onCompleted();
                                }
                            }
                        });
                        break;
                    }
                    case ERROR: {
                        Head.this.callbackWorker.schedule(new Action0(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            public void call() {
                                Subscriber firstSubscriber;
                                Object object = Head.this.receiverMutex;
                                synchronized (object) {
                                    firstSubscriber = (Subscriber)Iterables.getFirst((Iterable)Head.this.receivers.get((Object)mc.message.route), null);
                                }
                                if (null != firstSubscriber) {
                                    firstSubscriber.onError((Throwable)new ReceiveException(mc.message));
                                }
                            }
                        });
                    }
                }
            }

            @Override
            public void post(Runnable r) {
                Head.this.post(r);
            }

            @Override
            public void postDelayed(Runnable r, int delayMs) {
                Head.this.postDelayed(r, delayMs);
            }

            @Override
            public Scheduler getScheduler() {
                return Head.this.getScheduler();
            }
        }, savedState);
    }

    @Override
    public void onSaveState(MessageControlNode.Bundle savedState) {
        this.downstream.onSaveState(savedState);
    }

    @Override
    public void onActive(boolean active) {
        this.downstream.onActive(active);
    }

    @Override
    public void onMessageControl(MessageControl mc) {
        this.downstream.onMessageControl(mc);
    }

    @Override
    public MessageControlState getMessageControlState() {
        return this.mcs;
    }

    @Override
    public void post(Runnable r) {
        this.context.post(r);
    }

    @Override
    public void postDelayed(Runnable r, int delayMs) {
        this.context.postDelayed(r, delayMs);
    }

    @Override
    public Scheduler getScheduler() {
        return this.context.getScheduler();
    }

    public static final class ReceiveException
    extends Exception {
        public final Message message;

        private ReceiveException(Message message) {
            this.message = message;
        }
    }
}

