/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.concurrent.executor;

import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.concurrent.executor.Result;
import com.oracle.coherence.concurrent.executor.Task;
import com.oracle.coherence.concurrent.executor.internal.ExecutorTrace;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class AbstractTaskCoordinator<T>
implements Task.Coordinator<T> {
    protected final ExecutorService f_executorService;
    protected final String f_sTaskId;
    protected final boolean f_fRetainTask;
    protected final AtomicBoolean f_closed;
    protected final AtomicBoolean f_cancelled;
    protected Set<Task.Subscriber<? super T>> m_setSubscribers;
    protected volatile Result<T> m_lastValue;

    public AbstractTaskCoordinator(String taskId, ExecutorService executorService, boolean fRetainTask) {
        this.f_sTaskId = taskId;
        this.f_fRetainTask = fRetainTask;
        this.m_setSubscribers = new CopyOnWriteArraySet<Task.Subscriber<? super T>>();
        this.m_lastValue = Result.none();
        this.f_cancelled = new AtomicBoolean(false);
        this.f_closed = new AtomicBoolean(false);
        this.f_executorService = executorService;
    }

    @Override
    public String getTaskId() {
        return this.f_sTaskId;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!this.f_closed.get() && this.f_cancelled.compareAndSet(false, true)) {
            this.close();
            return true;
        }
        return false;
    }

    @Override
    public boolean isCancelled() {
        return this.f_cancelled.get();
    }

    @Override
    public boolean isDone() {
        return this.f_closed.get();
    }

    @Override
    public void subscribe(final Task.Subscriber<? super T> subscriber) {
        subscriber.onSubscribe(new Task.Subscription<T>(){

            @Override
            public void cancel() {
                AbstractTaskCoordinator.this.m_setSubscribers.remove(subscriber);
            }

            @Override
            public Task.Coordinator<T> getCoordinator() {
                return AbstractTaskCoordinator.this;
            }
        });
        boolean addSubscriber = true;
        if (this.f_closed.get()) {
            addSubscriber = false;
            if (this.f_fRetainTask) {
                this.subscribeRetainedTask(subscriber);
            }
            Runnable closeSubscriberRunnable = () -> {
                if (this.m_lastValue != null) {
                    Throwable throwable = null;
                    Object value = null;
                    try {
                        value = this.m_lastValue.get();
                    }
                    catch (Throwable t) {
                        throwable = t;
                    }
                    try {
                        if (throwable == null) {
                            subscriber.onNext(value);
                        } else {
                            subscriber.onError(throwable);
                        }
                    }
                    catch (Throwable throwable2) {
                        // empty catch block
                    }
                }
                this.closeSubscriber(subscriber, false);
            };
            try {
                this.f_executorService.submit(closeSubscriberRunnable);
            }
            catch (RejectedExecutionException e) {
                closeSubscriberRunnable.run();
            }
        }
        if (addSubscriber) {
            this.m_setSubscribers.add(subscriber);
        }
    }

    protected void closeSubscriber(Task.Subscriber<? super T> subscriber, boolean fRemove) {
        ExecutorTrace.entering(AbstractTaskCoordinator.class, "closeSubscriber", subscriber, this.getTaskId(), this.m_lastValue);
        if (fRemove) {
            ExecutorTrace.log(() -> String.format("Removing Subscriber %s", subscriber));
            this.m_setSubscribers.remove(subscriber);
            ExecutorTrace.log(() -> String.format("Removed Subscriber %s", subscriber));
        }
        try {
            if (this.f_cancelled.get()) {
                ExecutorTrace.log(() -> String.format("Notifying Subscriber %s of cancellation", subscriber));
                subscriber.onError(new InterruptedException("Task " + this.getTaskId() + " has been cancelled."));
            } else if (this.m_lastValue != null && this.m_lastValue.isValue()) {
                subscriber.onComplete();
            } else {
                ExecutorTrace.log(() -> String.format("Subscriber %s of closed, but no results received.", subscriber));
            }
        }
        catch (Throwable throwable) {
            Logger.warn(() -> String.format("Failed to close subscriber %s", subscriber));
            ExecutorTrace.throwing(AbstractTaskCoordinator.class, "closeSubscriber", throwable, new Object[0]);
            this.m_setSubscribers.remove(subscriber);
            try {
                subscriber.onError(throwable);
            }
            catch (Throwable throwable2) {
                // empty catch block
            }
        }
        ExecutorTrace.exiting(AbstractTaskCoordinator.class, "closeSubscriber");
    }

    public void close() {
        if (this.f_closed.compareAndSet(false, true)) {
            ExecutorTrace.log("Scheduling the closing of subscribers");
            Runnable closeSubscribersRunnable = () -> {
                for (Task.Subscriber<? super T> subscriber : this.m_setSubscribers) {
                    this.closeSubscriber(subscriber, true);
                }
            };
            try {
                this.f_executorService.submit(closeSubscribersRunnable);
            }
            catch (RejectedExecutionException e) {
                closeSubscribersRunnable.run();
            }
        } else {
            ExecutorTrace.log("Skipped closing subscribers as the coordinator is already closed");
        }
    }

    public boolean hasSubscribers() {
        return !this.m_setSubscribers.isEmpty();
    }

    public void offer(Result<T> item) {
        ExecutorTrace.entering(AbstractTaskCoordinator.class, "offer", item);
        if (!this.f_closed.get() && this.hasSubscribers()) {
            Runnable offerRunnable = () -> {
                Object result = null;
                Throwable resultError = null;
                try {
                    result = item.get();
                }
                catch (Throwable t) {
                    resultError = t;
                }
                for (Task.Subscriber<T> subscriber : this.m_setSubscribers) {
                    try {
                        if (resultError == null) {
                            subscriber.onNext(result);
                            continue;
                        }
                        subscriber.onError(resultError);
                    }
                    catch (Exception e) {
                        if (Logger.isEnabled((int)2)) {
                            String sMsg = "Task [%s]: removing subscriber [%s] as it threw an exception processing result: [%s]";
                            Logger.warn((String)String.format(sMsg, this.getTaskId(), subscriber, item), (Throwable)e);
                        }
                        this.m_setSubscribers.remove(subscriber);
                    }
                }
            };
            try {
                this.f_executorService.submit(offerRunnable);
            }
            catch (RejectedExecutionException e) {
                offerRunnable.run();
            }
        }
        ExecutorTrace.exiting(AbstractTaskCoordinator.class, "offer");
    }

    protected abstract void subscribeRetainedTask(Task.Subscriber<?> var1);
}

