/*
 * Decompiled with CFR 0.152.
 */
package org.openqa.selenium.events.zeromq;

import com.google.common.collect.EvictingQueue;
import dev.failsafe.Failsafe;
import dev.failsafe.Policy;
import dev.failsafe.RetryPolicy;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.openqa.selenium.events.Event;
import org.openqa.selenium.events.EventBus;
import org.openqa.selenium.events.EventListener;
import org.openqa.selenium.events.EventName;
import org.openqa.selenium.events.zeromq.ZeroMqEventBus;
import org.openqa.selenium.grid.security.Secret;
import org.openqa.selenium.internal.Require;
import org.openqa.selenium.json.Json;
import org.openqa.selenium.json.JsonException;
import org.openqa.selenium.json.JsonOutput;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

class UnboundZmqEventBus
implements EventBus {
    static final EventName REJECTED_EVENT = new EventName("selenium-rejected-event");
    private static final Logger LOG = Logger.getLogger(EventBus.class.getName());
    private static final Json JSON = new Json();
    private final AtomicBoolean pollingStarted = new AtomicBoolean(false);
    private final ExecutorService socketPollingExecutor;
    private final ExecutorService socketPublishingExecutor;
    private final ExecutorService listenerNotificationExecutor;
    private final Map<EventName, List<Consumer<Event>>> listeners = new ConcurrentHashMap<EventName, List<Consumer<Event>>>();
    private final Queue<UUID> recentMessages = EvictingQueue.create((int)128);
    private final String encodedSecret;
    private ZMQ.Poller poller;
    private ZMQ.Socket pub;
    private ZMQ.Socket sub;

    UnboundZmqEventBus(ZContext context, String publishConnection, String subscribeConnection, Secret secret) {
        Require.nonNull((String)"Secret", (Object)secret);
        StringBuilder builder = new StringBuilder();
        try (JsonOutput out = JSON.newOutput((Appendable)builder);){
            out.setPrettyPrint(false).writeClassName(false).write((Object)secret);
        }
        this.encodedSecret = builder.toString();
        this.socketPollingExecutor = Executors.newSingleThreadExecutor(r -> {
            Thread thread = new Thread(r);
            thread.setName("Event Bus Poller");
            thread.setDaemon(true);
            return thread;
        });
        this.socketPublishingExecutor = Executors.newSingleThreadExecutor(r -> {
            Thread thread = new Thread(r);
            thread.setName("Event Bus Publisher");
            thread.setDaemon(true);
            return thread;
        });
        this.listenerNotificationExecutor = Executors.newFixedThreadPool(Math.max(Runtime.getRuntime().availableProcessors() / 2, 2), r -> {
            Thread thread = new Thread(r);
            thread.setName("Event Bus Listener Notifier");
            thread.setDaemon(true);
            return thread;
        });
        String connectionMessage = String.format("Connecting to %s and %s", publishConnection, subscribeConnection);
        LOG.info(connectionMessage);
        RetryPolicy retryPolicy = RetryPolicy.builder().withMaxAttempts(5).withDelay(5L, 10L, ChronoUnit.SECONDS).onFailedAttempt(e -> LOG.log(Level.WARNING, String.format("%s failed", connectionMessage))).onRetry(e -> LOG.log(Level.WARNING, String.format("Failure #%s. Retrying.", e.getAttemptCount()))).onRetriesExceeded(e -> LOG.log(Level.WARNING, "Connection aborted.")).build();
        Failsafe.with((Policy)retryPolicy, (Policy[])new RetryPolicy[0]).run(() -> {
            this.sub = context.createSocket(SocketType.SUB);
            this.sub.setIPv6(this.isSubAddressIPv6(publishConnection));
            this.sub.connect(publishConnection);
            this.sub.subscribe(new byte[0]);
            this.pub = context.createSocket(SocketType.PUB);
            this.pub.setIPv6(this.isSubAddressIPv6(subscribeConnection));
            this.pub.connect(subscribeConnection);
        });
        this.poller = context.createPoller(1);
        this.poller.register(Objects.requireNonNull(this.sub), 1);
        LOG.info("Sockets created");
        this.socketPollingExecutor.submit(new PollingRunnable(secret));
        while (!this.pollingStarted.get()) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e2);
            }
        }
        LOG.info("Event bus ready");
    }

    @Override
    public boolean isReady() {
        return !this.socketPollingExecutor.isShutdown();
    }

    private boolean isSubAddressIPv6(String connection) {
        try {
            URI uri = new URI(connection);
            if ("inproc".equals(uri.getScheme())) {
                return false;
            }
            return InetAddress.getByName(uri.getHost()) instanceof Inet6Address;
        }
        catch (URISyntaxException | UnknownHostException e) {
            LOG.log(Level.WARNING, String.format("Could not determine if the address %s is IPv6 or IPv4", connection), e);
            return false;
        }
    }

    @Override
    public void addListener(EventListener<?> listener) {
        Require.nonNull((String)"Listener", listener);
        List typeListeners = this.listeners.computeIfAbsent(listener.getEventName(), t -> new LinkedList());
        typeListeners.add(listener);
    }

    @Override
    public void fire(Event event) {
        Require.nonNull((String)"Event to send", (Object)event);
        this.socketPublishingExecutor.execute(() -> {
            this.pub.sendMore(event.getType().getName().getBytes(StandardCharsets.UTF_8));
            this.pub.sendMore(this.encodedSecret.getBytes(StandardCharsets.UTF_8));
            this.pub.sendMore(event.getId().toString().getBytes(StandardCharsets.UTF_8));
            this.pub.send(event.getRawData().getBytes(StandardCharsets.UTF_8));
        });
    }

    @Override
    public void close() {
        this.socketPollingExecutor.shutdownNow();
        this.socketPublishingExecutor.shutdownNow();
        this.listenerNotificationExecutor.shutdownNow();
        this.poller.close();
        if (this.sub != null) {
            this.sub.close();
        }
        if (this.pub != null) {
            this.pub.close();
        }
    }

    private class PollingRunnable
    implements Runnable {
        private final Secret secret;

        public PollingRunnable(Secret secret) {
            this.secret = secret;
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    int count = UnboundZmqEventBus.this.poller.poll(150L);
                    UnboundZmqEventBus.this.pollingStarted.lazySet(true);
                    for (int i = 0; i < count; ++i) {
                        UUID id;
                        Secret eventSecret;
                        ZMQ.Socket socket;
                        EventName eventName;
                        if (!UnboundZmqEventBus.this.poller.pollin(i) || !UnboundZmqEventBus.this.listeners.containsKey(eventName = new EventName(new String((socket = UnboundZmqEventBus.this.poller.getSocket(i)).recv(), StandardCharsets.UTF_8)))) continue;
                        String receivedEventSecret = new String(socket.recv(), StandardCharsets.UTF_8);
                        try {
                            eventSecret = (Secret)JSON.toType(receivedEventSecret, Secret.class);
                        }
                        catch (JsonException ignore) {
                            this.rejectEvent(eventName, receivedEventSecret, "Could not parse event secret, rejecting event.");
                            continue;
                        }
                        String eventId = new String(socket.recv(), StandardCharsets.UTF_8);
                        try {
                            id = UUID.fromString(eventId);
                        }
                        catch (IllegalArgumentException ignore) {
                            this.rejectEvent(eventName, eventId, "Could not parse event id, rejecting event.");
                            continue;
                        }
                        String data = new String(socket.recv(), StandardCharsets.UTF_8);
                        if (UnboundZmqEventBus.this.recentMessages.contains(id)) continue;
                        Object converted = JSON.toType(data, Object.class);
                        Event event = new Event(id, eventName, converted);
                        UnboundZmqEventBus.this.recentMessages.add(id);
                        if (!Secret.matches(this.secret, eventSecret)) {
                            this.rejectEvent(eventName, data, "Rejecting message without a valid secret");
                            continue;
                        }
                        this.notifyListeners(eventName, event);
                    }
                }
                catch (Exception e) {
                    if (e.getCause() instanceof AssertionError) continue;
                    LOG.log(Level.WARNING, e, () -> "Caught exception while polling for event bus messages: " + e.getMessage());
                }
            }
        }

        private void rejectEvent(EventName eventName, String data, String message) {
            Event rejectedEvent = new Event(REJECTED_EVENT, new ZeroMqEventBus.RejectedEvent(eventName, data));
            LOG.log(Level.SEVERE, "{0}. {1}", new Object[]{message, rejectedEvent});
            this.notifyListeners(REJECTED_EVENT, rejectedEvent);
        }

        private void notifyListeners(EventName eventName, Event event) {
            List eventListeners = UnboundZmqEventBus.this.listeners.getOrDefault(eventName, new ArrayList());
            eventListeners.forEach(listener -> UnboundZmqEventBus.this.listenerNotificationExecutor.submit(() -> {
                try {
                    listener.accept(event);
                }
                catch (Exception e) {
                    LOG.log(Level.WARNING, e, () -> "Caught exception from listener: " + listener);
                }
            }));
        }
    }
}

