/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.core.device;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.jetlinks.core.device.DeviceOperationBroker;
import org.jetlinks.core.device.DeviceStateInfo;
import org.jetlinks.core.device.ReplyFailureHandler;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.BroadcastMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.utils.Reactors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class StandaloneDeviceMessageBroker
implements DeviceOperationBroker,
MessageHandler {
    private static final Logger log = LoggerFactory.getLogger(StandaloneDeviceMessageBroker.class);
    private final Sinks.Many<Message> messageEmitterProcessor;
    private final Map<String, Sinks.Many<DeviceMessageReply>> replyProcessor = new ConcurrentHashMap<String, Sinks.Many<DeviceMessageReply>>();
    private final Map<String, AtomicInteger> partCache = new ConcurrentHashMap<String, AtomicInteger>();
    private final List<Function<Message, Mono<Void>>> handlers = new CopyOnWriteArrayList<Function<Message, Mono<Void>>>();
    private ReplyFailureHandler replyFailureHandler = (error, message) -> log.info("unhandled reply message:{}", (Object)message, (Object)error);
    private Function<Publisher<String>, Flux<DeviceStateInfo>> stateHandler;

    public StandaloneDeviceMessageBroker() {
        this((Sinks.Many<Message>)Sinks.many().multicast().onBackpressureBuffer());
    }

    public StandaloneDeviceMessageBroker(Sinks.Many<Message> processor) {
        this.messageEmitterProcessor = processor;
    }

    @Override
    public Flux<Message> handleSendToDeviceMessage(String serverId) {
        return this.messageEmitterProcessor.asFlux();
    }

    @Override
    public Disposable handleGetDeviceState(String serverId, Function<Publisher<String>, Flux<DeviceStateInfo>> stateMapper) {
        this.stateHandler = stateMapper;
        return () -> {
            this.stateHandler = null;
        };
    }

    @Override
    public Disposable handleSendToDeviceMessage(String serverId, Function<Message, Mono<Void>> handler) {
        this.handlers.add(handler);
        return () -> this.handlers.remove(handler);
    }

    @Override
    public Flux<DeviceStateInfo> getDeviceState(String serviceId, Collection<String> deviceIdList) {
        if (this.stateHandler != null) {
            return this.stateHandler.apply((Publisher<String>)Flux.fromIterable(deviceIdList));
        }
        return Flux.empty();
    }

    @Override
    public Mono<Boolean> reply(DeviceMessageReply message) {
        return Mono.defer(() -> {
            String messageId = message.getMessageId();
            if (StringUtils.isEmpty((Object)messageId)) {
                log.warn("reply message messageId is empty: {}", (Object)message);
                return Mono.just((Object)false);
            }
            String partMsgId = message.getHeader(Headers.fragmentBodyMessageId).orElse(null);
            if (partMsgId != null) {
                Sinks.Many<DeviceMessageReply> processor = this.replyProcessor.getOrDefault(partMsgId, this.replyProcessor.get(messageId));
                if (processor == null || processor.currentSubscriberCount() == 0) {
                    this.replyFailureHandler.handle(new NullPointerException("no reply handler"), message);
                    this.replyProcessor.remove(partMsgId);
                    return Mono.just((Object)false);
                }
                int partTotal = message.getHeader(Headers.fragmentNumber).orElse(1);
                AtomicInteger counter = this.partCache.computeIfAbsent(partMsgId, ignore -> new AtomicInteger(partTotal));
                processor.emitNext((Object)message, Sinks.EmitFailureHandler.FAIL_FAST);
                if (counter.decrementAndGet() <= 0) {
                    processor.tryEmitComplete();
                    this.replyProcessor.remove(partMsgId);
                }
                return Mono.just((Object)true);
            }
            Sinks.Many<DeviceMessageReply> processor = this.replyProcessor.get(messageId);
            Sinks.EmitResult result = processor.tryEmitNext((Object)message);
            if (result.isFailure()) {
                this.replyProcessor.remove(messageId);
                this.replyFailureHandler.handle(new DeviceOperationException.NoStackTrace(ErrorCode.SYSTEM_ERROR, "no reply handler " + result.name()), message);
                return Mono.just((Object)false);
            }
            processor.tryEmitComplete();
            return Mono.just((Object)true);
        }).doOnError(err -> this.replyFailureHandler.handle((Throwable)err, message));
    }

    @Override
    public Flux<DeviceMessageReply> handleReply(String deviceId, String messageId, Duration timeout) {
        return ((Flux)this.replyProcessor.computeIfAbsent(messageId, ignore -> Sinks.many().multicast().onBackpressureBuffer()).asFlux().as(flux -> {
            if (timeout.isZero()) {
                return flux;
            }
            return flux.timeout(timeout, (Publisher)Mono.error(() -> new DeviceOperationException(ErrorCode.TIME_OUT)));
        })).doFinally(signal -> this.replyProcessor.remove(messageId));
    }

    @Override
    public Mono<Integer> send(String serverId, Publisher<? extends Message> message) {
        if (this.messageEmitterProcessor.currentSubscriberCount() == 0 && this.handlers.isEmpty()) {
            return Reactors.ALWAYS_ZERO;
        }
        return Flux.from(message).flatMap(this::send).then(Reactors.ALWAYS_ONE);
    }

    private Mono<Void> send(Message msg) {
        int size;
        if (this.messageEmitterProcessor.currentSubscriberCount() > 0) {
            this.messageEmitterProcessor.emitNext((Object)msg, Reactors.emitFailureHandler());
        }
        if ((size = this.handlers.size()) == 0) {
            return Mono.empty();
        }
        if (size == 1) {
            return this.handlers.get(0).apply(msg);
        }
        return Flux.fromIterable(this.handlers).flatMap(handler -> (Mono)handler.apply(msg)).then();
    }

    @Override
    public Mono<Integer> send(Publisher<? extends BroadcastMessage> message) {
        return Mono.just((Object)0);
    }

    public void setReplyFailureHandler(ReplyFailureHandler replyFailureHandler) {
        this.replyFailureHandler = replyFailureHandler;
    }
}

