/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.pgevent;

import com.impossibl.postgres.api.jdbc.PGConnection;
import com.impossibl.postgres.api.jdbc.PGNotificationListener;
import java.sql.PreparedStatement;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.pgevent.PgEventEndpoint;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.task.BackgroundTask;
import org.apache.camel.support.task.TaskRunFailureException;
import org.apache.camel.support.task.Tasks;
import org.apache.camel.support.task.budget.Budgets;
import org.apache.camel.support.task.budget.TimeBudget;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PgEventConsumer
extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(PgEventConsumer.class);
    private final PgEventListener listener = new PgEventListener();
    private final PgEventEndpoint endpoint;
    private PGConnection dbConnection;
    private ScheduledExecutorService reconnectPool;
    private BackgroundTask reconnectTask;
    private ExecutorService workerPool;
    private boolean shutdownWorkerPool;

    public PgEventConsumer(PgEventEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
        this.endpoint = endpoint;
    }

    public PgEventListener getPgEventListener() {
        return this.listener;
    }

    protected void doInit() throws Exception {
        if (this.endpoint.getWorkerPool() != null) {
            this.workerPool = this.endpoint.getWorkerPool();
        } else {
            this.workerPool = this.endpoint.createWorkerPool((Object)this);
            this.shutdownWorkerPool = true;
        }
        this.reconnectPool = this.getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor((Object)this, "PgEventReconnect");
        this.reconnectTask = (BackgroundTask)Tasks.backgroundTask().withScheduledExecutor(this.reconnectPool).withBudget((TimeBudget)Budgets.iterationTimeBudget().withInterval(Duration.ofMillis(this.endpoint.getReconnectDelay())).withInitialDelay(Duration.ofSeconds(1L)).withUnlimitedDuration().build()).withName("PgEventReconnect").build();
    }

    protected void doStart() throws Exception {
        this.listener.initConnection();
        super.doStart();
    }

    protected void doStop() throws Exception {
        super.doStop();
        this.listener.closeConnection();
        this.getEndpoint().getCamelContext().getExecutorServiceManager().shutdown((ExecutorService)this.reconnectPool);
        if (this.shutdownWorkerPool && this.workerPool != null) {
            LOG.debug("Shutting down PgEventConsumer worker threads with timeout {} millis", (Object)10000);
            this.endpoint.getCamelContext().getExecutorServiceManager().shutdownGraceful(this.workerPool, 10000L);
            this.workerPool = null;
        }
    }

    public class PgEventListener
    implements PGNotificationListener {
        public void reconnect() {
            if (!PgEventConsumer.this.reconnectTask.isRunning()) {
                PgEventConsumer.this.reconnectTask.run(PgEventConsumer.this.endpoint.getCamelContext(), () -> {
                    if (PgEventConsumer.this.isRunAllowed()) {
                        LOG.debug("Connecting attempt #{}", (Object)PgEventConsumer.this.reconnectTask.iteration());
                        try {
                            this.initConnection();
                        }
                        catch (Exception e) {
                            String message = "Failed to connect attempt #" + PgEventConsumer.this.reconnectTask.iteration() + " due to: " + e.getMessage();
                            PgEventConsumer.this.getExceptionHandler().handleException(message, (Throwable)e);
                            throw new TaskRunFailureException(message, (Throwable)e);
                        }
                        LOG.debug("Connecting successful");
                    }
                    return false;
                });
            }
        }

        public void initConnection() throws Exception {
            PgEventConsumer.this.dbConnection = PgEventConsumer.this.endpoint.initJdbc();
            String channel = PgEventConsumer.this.endpoint.getChannel();
            if (!channel.matches("[a-zA-Z_][a-zA-Z0-9_]*")) {
                throw new IllegalArgumentException("Invalid channel name");
            }
            String sql = String.format("LISTEN %s", channel);
            try (PreparedStatement statement = PgEventConsumer.this.dbConnection.prepareStatement(sql);){
                statement.execute();
            }
            PgEventConsumer.this.dbConnection.addNotificationListener(PgEventConsumer.this.endpoint.getChannel(), PgEventConsumer.this.endpoint.getChannel(), (PGNotificationListener)PgEventConsumer.this.listener);
        }

        public void closeConnection() throws Exception {
            if (PgEventConsumer.this.dbConnection != null) {
                try {
                    String channel = PgEventConsumer.this.endpoint.getChannel();
                    if (!channel.matches("[a-zA-Z_][a-zA-Z0-9_]*")) {
                        throw new IllegalArgumentException("Invalid channel name");
                    }
                    PgEventConsumer.this.dbConnection.removeNotificationListener(channel);
                    String sql = String.format("UNLISTEN %s", channel);
                    try (PreparedStatement statement = PgEventConsumer.this.dbConnection.prepareStatement(sql);){
                        statement.execute();
                    }
                    PgEventConsumer.this.dbConnection.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            PgEventConsumer.this.dbConnection = null;
        }

        public void notification(int processId, String channel, String payload) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Notification processId: {}, channel: {}, payload: {}", new Object[]{processId, channel, payload});
            }
            Exchange exchange = PgEventConsumer.this.createExchange(false);
            Message msg = exchange.getIn();
            msg.setHeader("channel", (Object)channel);
            msg.setBody((Object)payload);
            if (PgEventConsumer.this.workerPool != null) {
                PgEventConsumer.this.workerPool.submit(() -> {
                    try {
                        PgEventConsumer.this.getProcessor().process(exchange);
                    }
                    catch (Exception e) {
                        exchange.setException((Throwable)e);
                    }
                    if (exchange.getException() != null) {
                        String cause = "Unable to process incoming notification from PostgreSQL: processId='" + processId + "', channel='" + channel + "', payload='" + payload + "'";
                        PgEventConsumer.this.getExceptionHandler().handleException(cause, (Throwable)exchange.getException());
                    }
                    PgEventConsumer.this.releaseExchange(exchange, false);
                });
            }
        }

        public void closed() {
            LOG.warn("Connection to PostgreSQL lost unexpected. Re-connecting...");
            this.reconnect();
        }
    }
}

