package io.contek.invoker.commons.api.websocket;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:io/contek/invoker/commons/api/websocket/BaseWebSocketChannel.class */
public abstract class BaseWebSocketChannel<Message> implements IWebSocketComponent {
    private static final Logger log = LoggerFactory.getLogger(BaseWebSocketChannel.class);
    private final AtomicReference<SubscriptionState> stateHolder = new AtomicReference<>(SubscriptionState.UNSUBSCRIBED);
    private final List<ISubscribingConsumer<Message>> consumers = new LinkedList();

    public final void addConsumer(ISubscribingConsumer<Message> iSubscribingConsumer) {
        synchronized (this.consumers) {
            synchronized (this.stateHolder) {
                iSubscribingConsumer.onStateChange(this.stateHolder.get());
            }
            this.consumers.add(iSubscribingConsumer);
        }
    }

    @Override // io.contek.invoker.commons.api.websocket.IWebSocketComponent
    public final void heartbeat(WebSocketSession webSocketSession) {
        synchronized (this.consumers) {
            ConsumerState childConsumerState = getChildConsumerState();
            synchronized (this.stateHolder) {
                SubscriptionState subscriptionState = this.stateHolder.get();
                SubscriptionState subscriptionState2 = null;
                if (subscriptionState == SubscriptionState.SUBSCRIBED && childConsumerState == ConsumerState.IDLE) {
                    log.info("Unsubscribing channel {}.", getDisplayName());
                    subscriptionState2 = unsubscribe(webSocketSession);
                    if (subscriptionState2 == SubscriptionState.SUBSCRIBED || subscriptionState2 == SubscriptionState.SUBSCRIBING) {
                        log.error("Channel {} has invalid state after unsubscribe: {}.", getDisplayName(), subscriptionState2);
                    }
                } else if (subscriptionState == SubscriptionState.UNSUBSCRIBED && childConsumerState == ConsumerState.ACTIVE) {
                    log.info("Subscribing channel {}.", getDisplayName());
                    subscriptionState2 = subscribe(webSocketSession);
                    if (subscriptionState2 == SubscriptionState.UNSUBSCRIBED || subscriptionState2 == SubscriptionState.UNSUBSCRIBING) {
                        log.error("Channel {} has invalid state after subscribe: {}.", getDisplayName(), subscriptionState2);
                    }
                }
                if (subscriptionState2 != null) {
                    setState(subscriptionState2);
                }
            }
        }
    }

    @Override // io.contek.invoker.commons.api.websocket.IWebSocketComponent
    public final ConsumerState getState() {
        ConsumerState consumerState;
        synchronized (this.consumers) {
            if (getChildConsumerState() == ConsumerState.ACTIVE) {
                return ConsumerState.ACTIVE;
            }
            synchronized (this.stateHolder) {
                consumerState = this.stateHolder.get() != SubscriptionState.UNSUBSCRIBED ? ConsumerState.ACTIVE : ConsumerState.IDLE;
            }
            return consumerState;
        }
    }

    @Override // io.contek.invoker.commons.api.websocket.IWebSocketListener
    public final void onMessage(AnyWebSocketMessage anyWebSocketMessage) {
        synchronized (this.consumers) {
            Message tryCast = tryCast(anyWebSocketMessage);
            if (tryCast != null) {
                this.consumers.forEach(iSubscribingConsumer -> {
                    iSubscribingConsumer.onNext(tryCast);
                });
            }
        }
        SubscriptionState state = getState(anyWebSocketMessage);
        if (state != null) {
            log.info("Channel {} is now {}.", getDisplayName(), state);
            setState(state);
        }
    }

    @Override // io.contek.invoker.commons.api.websocket.IWebSocketListener
    public final void afterDisconnect() {
        reset();
        setState(SubscriptionState.UNSUBSCRIBED);
    }

    protected abstract String getDisplayName();

    protected abstract SubscriptionState subscribe(WebSocketSession webSocketSession);

    protected abstract SubscriptionState unsubscribe(WebSocketSession webSocketSession);

    @Nullable
    protected abstract SubscriptionState getState(AnyWebSocketMessage anyWebSocketMessage);

    protected abstract Class<Message> getMessageType();

    protected abstract boolean accepts(Message message);

    protected abstract void reset();

    private ConsumerState getChildConsumerState() {
        ConsumerState consumerState;
        synchronized (this.consumers) {
            this.consumers.removeIf(iSubscribingConsumer -> {
                return iSubscribingConsumer.getState() == ConsumerState.TERMINATED;
            });
            consumerState = this.consumers.stream().anyMatch(iSubscribingConsumer2 -> {
                return iSubscribingConsumer2.getState() == ConsumerState.ACTIVE;
            }) ? ConsumerState.ACTIVE : ConsumerState.IDLE;
        }
        return consumerState;
    }

    @Nullable
    private Message tryCast(AnyWebSocketMessage anyWebSocketMessage) {
        if (!getMessageType().isAssignableFrom(anyWebSocketMessage.getClass())) {
            return null;
        }
        Message cast = getMessageType().cast(anyWebSocketMessage);
        if (accepts(cast)) {
            return cast;
        }
        return null;
    }

    private void setState(SubscriptionState subscriptionState) {
        synchronized (this.consumers) {
            synchronized (this.stateHolder) {
                this.consumers.forEach(iSubscribingConsumer -> {
                    iSubscribingConsumer.onStateChange(subscriptionState);
                });
                this.stateHolder.set(subscriptionState);
            }
        }
    }
}
