package org.factcast.store.internal.listen;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.EventBus;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import lombok.Generated;
import lombok.NonNull;
import org.factcast.core.util.FactCastJson;
import org.factcast.store.StoreConfigurationProperties;
import org.factcast.store.internal.PgConstants;
import org.factcast.store.internal.PgMetrics;
import org.factcast.store.internal.StoreMetrics;
import org.postgresql.PGNotification;
import org.postgresql.jdbc.PgConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

/* loaded from: input_file:org/factcast/store/internal/listen/PgListener.class */
public class PgListener implements InitializingBean, DisposableBean {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(PgListener.class);

    @NonNull
    private final PgConnectionSupplier pgConnectionSupplier;

    @NonNull
    private final EventBus eventBus;

    @NonNull
    private final StoreConfigurationProperties props;

    @NonNull
    private final PgMetrics pgMetrics;
    private Thread listenerThread;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final CountDownLatch countDownLatch = new CountDownLatch(1);

    /* loaded from: input_file:org/factcast/store/internal/listen/PgListener$FactInsertionEvent.class */
    public static final class FactInsertionEvent {
        private final String name;
        private final String ns;
        private final String type;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public FactInsertionEvent(String str, String str2, String str3) {
            this.name = str;
            this.ns = str2;
            this.type = str3;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String name() {
            return this.name;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String ns() {
            return this.ns;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String type() {
            return this.type;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof FactInsertionEvent)) {
                return false;
            }
            FactInsertionEvent factInsertionEvent = (FactInsertionEvent) obj;
            String name = name();
            String name2 = factInsertionEvent.name();
            if (name == null) {
                if (name2 != null) {
                    return false;
                }
            } else if (!name.equals(name2)) {
                return false;
            }
            String ns = ns();
            String ns2 = factInsertionEvent.ns();
            if (ns == null) {
                if (ns2 != null) {
                    return false;
                }
            } else if (!ns.equals(ns2)) {
                return false;
            }
            String type = type();
            String type2 = factInsertionEvent.type();
            return type == null ? type2 == null : type.equals(type2);
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public int hashCode() {
            String name = name();
            int hashCode = (1 * 59) + (name == null ? 43 : name.hashCode());
            String ns = ns();
            int hashCode2 = (hashCode * 59) + (ns == null ? 43 : ns.hashCode());
            String type = type();
            return (hashCode2 * 59) + (type == null ? 43 : type.hashCode());
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String toString() {
            return "PgListener.FactInsertionEvent(name=" + name() + ", ns=" + ns() + ", type=" + type() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    /* loaded from: input_file:org/factcast/store/internal/listen/PgListener$NotificationReceiverLoop.class */
    public class NotificationReceiverLoop implements Runnable {
        protected NotificationReceiverLoop() {
        }

        @Override // java.lang.Runnable
        public void run() {
            PgConnection pgConnection;
            Throwable th;
            while (PgListener.this.running.get()) {
                try {
                    pgConnection = PgListener.this.pgConnectionSupplier.get();
                    th = null;
                } catch (Exception e) {
                    if (PgListener.this.running.get()) {
                        PgListener.log.warn("While waiting for Notifications", e);
                        PgListener.this.sleep();
                    }
                }
                try {
                    try {
                        PgListener.this.connectionSetup(pgConnection);
                        while (PgListener.this.running.get()) {
                            PgListener.this.informSubscriberOfChannelNotifications(PgListener.this.receiveNotifications(pgConnection));
                        }
                        if (pgConnection != null) {
                            if (0 != 0) {
                                try {
                                    pgConnection.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                pgConnection.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                    break;
                }
            }
        }
    }

    @VisibleForTesting
    protected void listen() {
        log.trace("Starting instance Listener");
        this.listenerThread = new Thread(new NotificationReceiverLoop(), "PG Instance Listener");
        this.listenerThread.setDaemon(true);
        this.listenerThread.setUncaughtExceptionHandler((thread, th) -> {
            log.error("thread " + thread + " encountered an unhandled exception", th);
        });
        this.listenerThread.start();
        try {
            log.info("Waiting to establish postgres listener (max 15sec.)");
            log.info("postgres listener " + (this.countDownLatch.await(15L, TimeUnit.SECONDS) ? "" : "not ") + "established");
        } catch (InterruptedException e) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void connectionSetup(PgConnection pgConnection) throws SQLException {
        setupPostgresListeners(pgConnection);
        this.countDownLatch.countDown();
        informSubscribersAboutFreshConnection();
    }

    @VisibleForTesting
    protected void setupPostgresListeners(PgConnection pgConnection) throws SQLException {
        PreparedStatement prepareStatement = pgConnection.prepareStatement(PgConstants.LISTEN_SQL);
        Throwable th = null;
        try {
            try {
                prepareStatement.execute();
                if (prepareStatement != null) {
                    if (0 != 0) {
                        try {
                            prepareStatement.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        prepareStatement.close();
                    }
                }
                prepareStatement = pgConnection.prepareStatement(PgConstants.LISTEN_ROUNDTRIP_CHANNEL_SQL);
                Throwable th3 = null;
                try {
                    try {
                        prepareStatement.execute();
                        if (prepareStatement != null) {
                            if (0 == 0) {
                                prepareStatement.close();
                                return;
                            }
                            try {
                                prepareStatement.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        }
                    } catch (Throwable th5) {
                        th3 = th5;
                        throw th5;
                    }
                } finally {
                }
            } catch (Throwable th6) {
                th = th6;
                throw th6;
            }
        } finally {
        }
    }

    @VisibleForTesting
    protected void informSubscribersAboutFreshConnection() {
        postEvent(PgConstants.CHANNEL_SCHEDULED_POLL);
    }

    @VisibleForTesting
    protected void informSubscriberOfChannelNotifications(PGNotification[] pGNotificationArr) {
        Arrays.stream(pGNotificationArr).forEachOrdered(pGNotification -> {
            String name = pGNotification.getName();
            boolean z = -1;
            switch (name.hashCode()) {
                case -1504503828:
                    if (name.equals(PgConstants.CHANNEL_FACT_INSERT)) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    try {
                        JsonNode jsonNode = FactCastJson.readTree(pGNotification.getParameter()).get(PgConstants.COLUMN_HEADER);
                        String asText = jsonNode.get(PgConstants.ALIAS_NS).asText();
                        String asText2 = jsonNode.get(PgConstants.ALIAS_TYPE).asText();
                        log.trace("notifying consumers for '{}' with ns={}, type={}", new Object[]{PgConstants.CHANNEL_FACT_INSERT, asText, asText2});
                        postEvent(PgConstants.CHANNEL_FACT_INSERT, asText, asText2);
                        return;
                    } catch (JsonProcessingException | NullPointerException e) {
                        log.trace("notifying consumers for '{}'", PgConstants.CHANNEL_FACT_INSERT);
                        postEvent(PgConstants.CHANNEL_FACT_INSERT);
                        return;
                    }
                default:
                    if (PgConstants.CHANNEL_ROUNDTRIP.equals(name)) {
                        return;
                    }
                    log.debug("Ignored notification from unknown channel: {}", name);
                    return;
            }
        });
    }

    @VisibleForTesting
    protected PGNotification[] receiveNotifications(PgConnection pgConnection) throws SQLException {
        PGNotification[] notifications = pgConnection.getNotifications(this.props.getFactNotificationBlockingWaitTimeInMillis());
        if (notifications == null || notifications.length == 0) {
            notifications = checkDatabaseConnectionHealthy(pgConnection);
        }
        return notifications;
    }

    @VisibleForTesting
    protected PGNotification[] checkDatabaseConnectionHealthy(PgConnection pgConnection) throws SQLException {
        long nanoTime = System.nanoTime();
        pgConnection.prepareCall(PgConstants.NOTIFY_ROUNDTRIP).execute();
        PGNotification[] notifications = pgConnection.getNotifications(this.props.getFactNotificationMaxRoundTripLatencyInMillis());
        if (notifications == null || notifications.length == 0) {
            this.pgMetrics.counter(StoreMetrics.EVENT.MISSED_ROUNDTRIP).increment();
            throw new SQLException("Missed roundtrip notification from channel '" + PgConstants.CHANNEL_ROUNDTRIP + "'");
        }
        this.pgMetrics.timer(StoreMetrics.OP.NOTIFY_ROUNDTRIP).record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
        return notifications;
    }

    @VisibleForTesting
    protected void sleep() {
        try {
            Thread.sleep(this.props.getFactNotificationNewConnectionWaitTimeInMillis());
        } catch (InterruptedException e) {
        }
    }

    @VisibleForTesting
    protected void postEvent(String str) {
        postEvent(str, null, null);
    }

    @VisibleForTesting
    protected void postEvent(@NonNull String str, @Nullable String str2, @Nullable String str3) {
        Objects.requireNonNull(str, "name is marked non-null but is null");
        if (this.running.get()) {
            this.eventBus.post(new FactInsertionEvent(str, str2, str3));
        }
    }

    public void afterPropertiesSet() {
        listen();
    }

    public void destroy() {
        this.running.set(false);
        if (this.listenerThread != null) {
            this.listenerThread.interrupt();
        }
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public PgListener(@NonNull PgConnectionSupplier pgConnectionSupplier, @NonNull EventBus eventBus, @NonNull StoreConfigurationProperties storeConfigurationProperties, @NonNull PgMetrics pgMetrics) {
        Objects.requireNonNull(pgConnectionSupplier, "pgConnectionSupplier is marked non-null but is null");
        Objects.requireNonNull(eventBus, "eventBus is marked non-null but is null");
        Objects.requireNonNull(storeConfigurationProperties, "props is marked non-null but is null");
        Objects.requireNonNull(pgMetrics, "pgMetrics is marked non-null but is null");
        this.pgConnectionSupplier = pgConnectionSupplier;
        this.eventBus = eventBus;
        this.props = storeConfigurationProperties;
        this.pgMetrics = pgMetrics;
    }
}
