package org.factcast.store.internal.listen;

import com.google.common.eventbus.EventBus;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.assertj.core.api.Assertions;
import org.factcast.store.StoreConfigurationProperties;
import org.factcast.store.internal.PgConstants;
import org.factcast.store.internal.PgMetrics;
import org.factcast.store.internal.listen.PgListener;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.postgresql.PGNotification;
import org.postgresql.core.Notification;
import org.postgresql.jdbc.PgConnection;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/factcast/store/internal/listen/PgListenerTest.class */
public class PgListenerTest {
    private static final Predicate<Object> IS_FACT_INSERT = obj -> {
        return obj instanceof PgListener.FactInsertionSignal;
    };
    private static final Predicate<Object> NOT_SCHEDULED_POLL = obj -> {
        return !((PgListener.FactInsertionSignal) obj).name().equals("scheduled-poll");
    };

    @Mock
    private PgConnectionSupplier pgConnectionSupplier;

    @Mock
    private EventBus eventBus;

    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
    private PgConnection conn;

    @Mock
    private PreparedStatement ps;

    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
    private PgMetrics registry;
    private final StoreConfigurationProperties props = new StoreConfigurationProperties();

    @Captor
    private ArgumentCaptor<Object> factCaptor;

    @Test
    public void postgresListenersAreSetup() throws SQLException {
        Mockito.when(Boolean.valueOf(this.conn.prepareStatement(ArgumentMatchers.anyString()).execute())).thenReturn(true);
        new PgListener(this.pgConnectionSupplier, this.eventBus, this.props, this.registry).setupPostgresListeners(this.conn);
        ((PreparedStatement) Mockito.verify(this.conn.prepareStatement(ArgumentMatchers.anyString()), Mockito.times(5))).execute();
    }

    @Test
    public void subscribersAreInformedViaInternalEvent() {
        new PgListener(this.pgConnectionSupplier, this.eventBus, this.props, this.registry).informSubscribersAboutFreshConnection();
        ((EventBus) Mockito.verify(this.eventBus, Mockito.atLeastOnce())).post(this.factCaptor.capture());
        Assertions.assertThat(this.factCaptor.getAllValues().stream().anyMatch(obj -> {
            return (obj instanceof PgListener.FactInsertionSignal) && ((PgListener.FactInsertionSignal) obj).name().equals("scheduled-poll");
        })).isTrue();
    }

    @Test
    public void receivedNotificationsArePassedThrough() throws SQLException {
        Mockito.when(this.conn.getNotifications(ArgumentMatchers.anyInt())).thenReturn(new PGNotification[]{new Notification("some notification", 1)});
        PgListener pgListener = (PgListener) Mockito.spy(new PgListener(this.pgConnectionSupplier, this.eventBus, this.props, this.registry));
        PGNotification[] receiveNotifications = pgListener.receiveNotifications(this.conn);
        ((PgListener) Mockito.verify(pgListener, Mockito.never())).checkDatabaseConnectionHealthy((PgConnection) ArgumentMatchers.any());
        org.junit.jupiter.api.Assertions.assertEquals(1, receiveNotifications.length);
        org.junit.jupiter.api.Assertions.assertEquals("some notification", receiveNotifications[0].getName());
    }

    @Test
    public void whenReceiveTimeoutExpiresHealthCheckIsExecuted() throws SQLException {
        PgListener pgListener = (PgListener) Mockito.spy(new PgListener(this.pgConnectionSupplier, this.eventBus, this.props, this.registry));
        Mockito.when(this.conn.getNotifications(ArgumentMatchers.anyInt())).thenReturn((Object) null);
        ((PgListener) Mockito.doReturn(new PGNotification[]{new Notification("some roundtrip notification", 1)}).when(pgListener)).checkDatabaseConnectionHealthy((PgConnection) ArgumentMatchers.any());
        PGNotification[] receiveNotifications = pgListener.receiveNotifications(this.conn);
        ((PgListener) Mockito.verify(pgListener, Mockito.times(1))).checkDatabaseConnectionHealthy((PgConnection) ArgumentMatchers.any());
        org.junit.jupiter.api.Assertions.assertEquals(1, receiveNotifications.length);
        org.junit.jupiter.api.Assertions.assertEquals("some roundtrip notification", receiveNotifications[0].getName());
    }

    @Test
    public void onSuccessfulHealthCheckNotificationsArePassedThrough() throws SQLException {
        Mockito.when(Boolean.valueOf(this.conn.prepareCall(PgConstants.NOTIFY_ROUNDTRIP).execute())).thenReturn(true);
        Mockito.when(this.conn.getNotifications(ArgumentMatchers.anyInt())).thenReturn(new PGNotification[]{new Notification("some notification", 1)});
        PGNotification[] checkDatabaseConnectionHealthy = new PgListener(this.pgConnectionSupplier, this.eventBus, this.props, this.registry).checkDatabaseConnectionHealthy(this.conn);
        org.junit.jupiter.api.Assertions.assertEquals(1, checkDatabaseConnectionHealthy.length);
        org.junit.jupiter.api.Assertions.assertEquals("some notification", checkDatabaseConnectionHealthy[0].getName());
    }

    @Test
    public void throwsSqlExceptionWhenNoRoundtripNotificationWasReceived() throws SQLException {
        Mockito.when(Boolean.valueOf(this.conn.prepareCall(PgConstants.NOTIFY_ROUNDTRIP).execute())).thenReturn(true);
        Mockito.when(this.conn.getNotifications(ArgumentMatchers.anyInt())).thenReturn((Object) null);
        org.junit.jupiter.api.Assertions.assertThrows(SQLException.class, () -> {
            new PgListener(this.pgConnectionSupplier, this.eventBus, this.props, this.registry).checkDatabaseConnectionHealthy(this.conn);
        });
    }

    @Test
    public void subscribersAreOnlyInformedAboutNewFactsInDatabase() {
        new PgListener(this.pgConnectionSupplier, this.eventBus, this.props, this.registry).processNotifications(new PGNotification[]{new Notification("some notification", 1, "{}"), new Notification("fact_insert", 1, "{}")});
        ((EventBus) Mockito.verify(this.eventBus, Mockito.times(1))).post(this.factCaptor.capture());
        Assertions.assertThat(this.factCaptor.getAllValues().stream().anyMatch(obj -> {
            return obj instanceof PgListener.FactInsertionSignal;
        })).isTrue();
    }

    @Test
    public void otherNotificationsAreIgnored() {
        new PgListener(this.pgConnectionSupplier, this.eventBus, this.props, this.registry).processNotifications(new PGNotification[]{new Notification("some notification", 1, "{}"), new Notification("some other notification", 1, "{}")});
        ((EventBus) Mockito.verify(this.eventBus, Mockito.never())).post(ArgumentMatchers.any(PgListener.FactInsertionSignal.class));
    }

    @Test
    public void notificationLoopHandlesSqlException() throws SQLException {
        Mockito.when(this.pgConnectionSupplier.get()).thenThrow(SQLException.class, new Class[]{RuntimeException.class, Error.class});
        PgListener pgListener = new PgListener(this.pgConnectionSupplier, this.eventBus, this.props, this.registry);
        Objects.requireNonNull(pgListener);
        PgListener.NotificationReceiverLoop notificationReceiverLoop = new PgListener.NotificationReceiverLoop(pgListener);
        Objects.requireNonNull(notificationReceiverLoop);
        org.junit.jupiter.api.Assertions.assertThrows(Error.class, notificationReceiverLoop::run);
        ((PgConnectionSupplier) Mockito.verify(this.pgConnectionSupplier, Mockito.times(3))).get();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    void testNotify() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Mockito.when(this.pgConnectionSupplier.get()).thenReturn(this.conn);
        Mockito.when(this.conn.prepareStatement(ArgumentMatchers.anyString())).thenReturn(this.ps);
        Mockito.when(Boolean.valueOf(this.conn.prepareCall(ArgumentMatchers.anyString()).execute())).thenReturn(true);
        Mockito.when(this.conn.getNotifications(ArgumentMatchers.anyInt())).thenReturn(new PGNotification[]{new Notification("fact_insert", 1, "{}"), new Notification("fact_insert", 1, "{"), new Notification("fact_insert", 1, "{\"ns\":\"namespace\",\"type\":\"theType\"}"), new Notification("fact_insert", 1, "{\"ns\":\"namespace\",\"type\":\"theOtherType\"}")}, new PGNotification[]{new PGNotification[]{new Notification("fact_insert", 2, "{}"), new Notification("fact_insert", 1, "{\"ns\":\"namespace\",\"type\":\"theOtherType\"}"), new Notification("fact_insert", 1, "{\"ns\":\"namespace\",\"type\":\"theType\"}")}, new PGNotification[]{new Notification("fact_insert", 3, "{}")}, new PGNotification[]{new Notification(PgConstants.CHANNEL_ROUNDTRIP, 3, "{}")}, new PGNotification[0], new PGNotification[0], new PGNotification[0]}).thenAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return null;
        });
        PgListener pgListener = new PgListener(this.pgConnectionSupplier, this.eventBus, this.props, this.registry);
        pgListener.listen();
        countDownLatch.await(1L, TimeUnit.MINUTES);
        pgListener.destroy();
        ((EventBus) Mockito.verify(this.eventBus, Mockito.atLeastOnce())).post(this.factCaptor.capture());
        List allValues = this.factCaptor.getAllValues();
        Assertions.assertThat(this.factCaptor.getAllValues().stream().anyMatch(obj -> {
            return (obj instanceof PgListener.FactInsertionSignal) && ((PgListener.FactInsertionSignal) obj).name().equals("scheduled-poll");
        })).isTrue();
        org.junit.jupiter.api.Assertions.assertTrue(allValues.stream().filter(obj2 -> {
            return !(obj2 instanceof PgListener.BlacklistChangeSignal);
        }).allMatch(IS_FACT_INSERT));
        allValues.stream().filter(IS_FACT_INSERT).count();
        Assertions.assertThat(allValues.stream().filter(IS_FACT_INSERT).filter(NOT_SCHEDULED_POLL)).containsExactlyInAnyOrder(new Object[]{new PgListener.FactInsertionSignal("fact_insert", (String) null, (String) null), new PgListener.FactInsertionSignal("fact_insert", (String) null, (String) null), new PgListener.FactInsertionSignal("fact_insert", (String) null, (String) null), new PgListener.FactInsertionSignal("fact_insert", "namespace", "theType"), new PgListener.FactInsertionSignal("fact_insert", "namespace", "theOtherType"), new PgListener.FactInsertionSignal("fact_insert", "namespace", "theType"), new PgListener.FactInsertionSignal("fact_insert", "namespace", "theOtherType")});
    }

    @Test
    void testNotifySchemaStoreChange() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        PGNotification notification = new Notification("schemastore_change", 1, "{\"ns\":\"namespace\",\"type\":\"theType\",\"version\":1}");
        PGNotification notification2 = new Notification("schemastore_change", 1, "{\"ns\":\"namespace\",\"type\":\"theType\"}");
        PGNotification notification3 = new Notification("fact_insert", 1, "{\"ns\":\"namespace\",\"type\":\"theType\",\"version\":1}");
        PGNotification notification4 = new Notification("schemastore_change", 1, "{\"ns\":\"namespace\",\"type\":\"theType\",\"version\":2}");
        Mockito.when(this.pgConnectionSupplier.get()).thenReturn(this.conn);
        Mockito.when(this.conn.prepareStatement(ArgumentMatchers.anyString())).thenReturn(this.ps);
        Mockito.when(this.conn.getNotifications(ArgumentMatchers.anyInt())).thenReturn(new PGNotification[]{notification, notification2, notification3, notification4}).thenAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return null;
        });
        PgListener pgListener = (PgListener) Mockito.spy(new PgListener(this.pgConnectionSupplier, this.eventBus, this.props, this.registry));
        pgListener.listen();
        countDownLatch.await(1L, TimeUnit.MINUTES);
        pgListener.destroy();
        ((PgListener) Mockito.verify(pgListener, Mockito.times(2))).postSchemaStoreChangeSignal((PgListener.SchemaStoreChangeSignal) ArgumentMatchers.any());
        ((PgListener) Mockito.verify(pgListener, Mockito.times(1))).postSchemaStoreChangeSignal(new PgListener.SchemaStoreChangeSignal("namespace", "theType", 1));
        ((PgListener) Mockito.verify(pgListener, Mockito.times(1))).postSchemaStoreChangeSignal(new PgListener.SchemaStoreChangeSignal("namespace", "theType", 2));
        ((EventBus) Mockito.verify(this.eventBus, Mockito.times(2))).post(ArgumentMatchers.any(PgListener.SchemaStoreChangeSignal.class));
    }

    @Test
    void testNotifyTransformationStoreChange() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        PGNotification notification = new Notification("transformationstore_change", 1, "{\"ns\":\"namespace\",\"type\":\"theType\"}");
        PGNotification notification2 = new Notification("transformationstore_change", 1, "{\"ns\":\"namespace\",\"invalidTypeKey\":\"theType\"}");
        PGNotification notification3 = new Notification("fact_insert", 1, "{\"ns\":\"namespace\",\"type\":\"theType\"}");
        PGNotification notification4 = new Notification("transformationstore_change", 1, "{\"ns\":\"namespace\",\"type\":\"theOtherType\"}");
        Mockito.when(this.pgConnectionSupplier.get()).thenReturn(this.conn);
        Mockito.when(this.conn.prepareStatement(ArgumentMatchers.anyString())).thenReturn(this.ps);
        Mockito.when(this.conn.getNotifications(ArgumentMatchers.anyInt())).thenReturn(new PGNotification[]{notification, notification2, notification3, notification4}).thenAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return null;
        });
        PgListener pgListener = (PgListener) Mockito.spy(new PgListener(this.pgConnectionSupplier, this.eventBus, this.props, this.registry));
        pgListener.listen();
        countDownLatch.await(1L, TimeUnit.MINUTES);
        pgListener.destroy();
        ((PgListener) Mockito.verify(pgListener, Mockito.times(2))).postTransformationStoreChangeSignal((PgListener.TransformationStoreChangeSignal) ArgumentMatchers.any());
        ((PgListener) Mockito.verify(pgListener, Mockito.times(1))).postTransformationStoreChangeSignal(new PgListener.TransformationStoreChangeSignal("namespace", "theType"));
        ((PgListener) Mockito.verify(pgListener, Mockito.times(1))).postTransformationStoreChangeSignal(new PgListener.TransformationStoreChangeSignal("namespace", "theOtherType"));
        ((EventBus) Mockito.verify(this.eventBus, Mockito.times(2))).post(ArgumentMatchers.any(PgListener.TransformationStoreChangeSignal.class));
    }

    @Test
    void testConnectionIsStopped() throws Exception {
        Mockito.when(this.pgConnectionSupplier.get()).thenReturn(this.conn);
        Mockito.when(this.conn.prepareStatement(ArgumentMatchers.anyString())).thenReturn((PreparedStatement) Mockito.mock(PreparedStatement.class));
        PgListener pgListener = new PgListener(this.pgConnectionSupplier, this.eventBus, this.props, this.registry);
        pgListener.afterPropertiesSet();
        pgListener.destroy();
        sleep(150);
        ((PgConnection) Mockito.verify(this.conn)).close();
    }

    @Test
    void testStopWithoutStarting() {
        new PgListener(this.pgConnectionSupplier, this.eventBus, this.props, this.registry).destroy();
        Mockito.verifyNoMoreInteractions(new Object[]{this.conn});
    }

    private void sleep(int i) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
        }
    }
}
