/*
 * Decompiled with CFR 0.152.
 */
package com.logviewer.data2.net.server;

import com.logviewer.data2.LogService;
import com.logviewer.data2.net.AbstractConnection;
import com.logviewer.data2.net.server.RemoteTaskContextImpl;
import com.logviewer.data2.net.server.api.RemoteTask;
import com.logviewer.data2.net.server.msg.MessageStartTask;
import com.logviewer.data2.net.server.msg.MessageTaskCallbackCall;
import com.logviewer.data2.net.server.msg.MessageTaskChangeEvent;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncomeConnection
extends AbstractConnection {
    private static final Logger LOG = LoggerFactory.getLogger(IncomeConnection.class);
    private final LogService logService;
    private final Consumer<IncomeConnection> disconnectListener;
    private final Map<Long, RemoteTask> tasks = new HashMap<Long, RemoteTask>();

    public IncomeConnection(AsynchronousSocketChannel socket, LogService logService, Consumer<IncomeConnection> disconnectListener) {
        super(socket);
        this.logService = logService;
        this.disconnectListener = disconnectListener;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void handleMessage(Object msg) {
        Class<?> msgType = msg.getClass();
        if (msgType == MessageStartTask.class) {
            RemoteTaskContextImpl<Object> ctx;
            MessageStartTask message = (MessageStartTask)msg;
            RemoteTask task = message.getTask();
            long taskId = message.getTaskId();
            IncomeConnection incomeConnection = this;
            synchronized (incomeConnection) {
                if (this.closed) {
                    return;
                }
                RemoteTask oldTask = this.tasks.putIfAbsent(taskId, task);
                if (oldTask != null) {
                    LOG.error("Duplicated task [id={}, task={}]", (Object)taskId, (Object)task);
                    return;
                }
                ctx = new RemoteTaskContextImpl<Object>(this.logService, (e, isTaskStopped) -> {
                    IncomeConnection incomeConnection = this;
                    synchronized (incomeConnection) {
                        this.sendMessage(new MessageTaskCallbackCall(taskId, e, (boolean)isTaskStopped));
                        if (isTaskStopped.booleanValue()) {
                            this.tasks.remove(taskId);
                        }
                    }
                }, error -> {
                    IncomeConnection incomeConnection = this;
                    synchronized (incomeConnection) {
                        this.sendMessage(new MessageTaskCallbackCall(taskId, (Throwable)error));
                        this.tasks.remove(taskId);
                    }
                });
            }
            try {
                task.start(ctx);
            }
            catch (Throwable e2) {
                ctx.sendErrorAndCloseChannel(e2);
            }
        } else {
            if (msgType == MessageTaskChangeEvent.class) {
                MessageTaskChangeEvent message = (MessageTaskChangeEvent)msg;
                IncomeConnection incomeConnection = this;
                synchronized (incomeConnection) {
                    if (this.closed) {
                        return;
                    }
                    long taskId = message.getTaskId();
                    RemoteTask task = this.tasks.get(taskId);
                    if (task == null) {
                        return;
                    }
                    if (message.getModifier() == null) {
                        this.tasks.remove(message.getTaskId());
                        try {
                            task.cancel();
                        }
                        catch (Throwable e3) {
                            LOG.error("Failed to call 'cancel()' method on task controller", e3);
                        }
                    } else {
                        try {
                            message.getModifier().accept(task);
                        }
                        catch (Throwable e4) {
                            LOG.error("Failed to handle control event", e4);
                        }
                    }
                }
            }
            throw new IllegalArgumentException("Unknown message: {}" + msg);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void onDisconnect() {
        IncomeConnection incomeConnection = this;
        synchronized (incomeConnection) {
            for (RemoteTask task : this.tasks.values()) {
                try {
                    task.cancel();
                }
                catch (Throwable e) {
                    LOG.error("Failed to call 'cancel()' method on task controller", e);
                }
            }
            this.tasks.clear();
        }
        ForkJoinPool.commonPool().execute(() -> this.disconnectListener.accept(this));
    }
}

