/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.core.server.session;

import io.netty.util.internal.ThreadLocalRandom;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.annotation.Nullable;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.utils.Reactors;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class MultiGatewayDeviceSession
extends CopyOnWriteArrayList<DeviceSession>
implements DeviceSession,
Disposable {
    private final long connectTime = System.currentTimeMillis();
    private final transient DeviceOperator device;
    private final transient DeviceSessionManager sessionManager;
    private final Disposable.Composite disposable = Disposables.composite();

    public MultiGatewayDeviceSession(DeviceOperator device, DeviceSessionManager sessionManager) {
        this.device = device;
        this.sessionManager = sessionManager;
    }

    @Override
    public String getId() {
        return this.device.getDeviceId();
    }

    @Override
    public String getDeviceId() {
        return this.device.getDeviceId();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void register(DeviceSession session) {
        MultiGatewayDeviceSession multiGatewayDeviceSession = this;
        synchronized (multiGatewayDeviceSession) {
            this.add(session);
        }
    }

    @Override
    @Nullable
    public final DeviceOperator getOperator() {
        return this.device;
    }

    @Override
    public long lastPingTime() {
        return this.stream().mapToLong(DeviceSession::lastPingTime).max().orElse(System.currentTimeMillis());
    }

    @Override
    public long connectTime() {
        return this.connectTime;
    }

    @Override
    public Mono<Boolean> send(EncodedMessage encodedMessage) {
        return Mono.deferContextual(ctx -> {
            DeviceSession session;
            DeviceMessage msg = ctx.getOrEmpty(DeviceMessage.class).orElse(null);
            if (msg != null) {
                // empty if block
            }
            if ((session = this.takeSession()) == null) {
                return Mono.error((Throwable)new DeviceOperationException.NoStackTrace(ErrorCode.CONNECTION_LOST));
            }
            return this.sendTo(session, encodedMessage);
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeSession(DeviceSession session) {
        MultiGatewayDeviceSession multiGatewayDeviceSession = this;
        synchronized (multiGatewayDeviceSession) {
            this.remove(session);
        }
    }

    private void handleClosed(DeviceSession connection) {
        this.removeSession(connection);
        this.sessionManager.getSession(this.getDeviceId(), true).subscribe();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final DeviceSession takeSession() {
        while (true) {
            DeviceSession session;
            MultiGatewayDeviceSession multiGatewayDeviceSession = this;
            synchronized (multiGatewayDeviceSession) {
                int size = this.size();
                if (size == 0) {
                    return null;
                }
                session = size == 1 ? (DeviceSession)this.get(0) : (DeviceSession)this.get(ThreadLocalRandom.current().nextInt(size));
            }
            if (session.isAlive()) {
                return session;
            }
            session.close();
            this.handleClosed(session);
        }
    }

    public Mono<Boolean> sendTo(DeviceSession session, EncodedMessage encodedMessage) {
        return session.send(encodedMessage);
    }

    @Override
    public Transport getTransport() {
        if (this.size() == 0) {
            return DefaultTransport.TCP;
        }
        return ((DeviceSession)this.get(0)).getTransport();
    }

    @Override
    public void close() {
        this.disposable.dispose();
        this.forEach(DeviceSession::close);
        this.clear();
    }

    @Override
    public void ping() {
        this.forEach(DeviceSession::ping);
    }

    @Override
    public boolean isAlive() {
        if (this.disposable.isDisposed() || this.isEmpty()) {
            return false;
        }
        boolean anyAlive = false;
        for (DeviceSession session : this) {
            if (session.isAlive()) {
                anyAlive = true;
                continue;
            }
            this.remove(session);
        }
        return anyAlive;
    }

    @Override
    public Mono<Boolean> isAliveAsync() {
        if (this.disposable.isDisposed() || this.isEmpty()) {
            return Reactors.ALWAYS_FALSE;
        }
        return Flux.fromIterable((Iterable)this).filterWhen(session -> session.isAliveAsync().doOnNext(alive -> {
            if (!alive.booleanValue()) {
                this.remove(session);
            }
        })).hasElements();
    }

    @Override
    public void onClose(Runnable call) {
        this.disposable.add(call::run);
    }

    public boolean isDisposed() {
        return this.disposable.isDisposed();
    }

    public void dispose() {
        this.disposable.dispose();
    }
}

