/*
 * Decompiled with CFR 0.152.
 */
package com.redhat.jenkins.plugins.ci.messaging;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.redhat.jenkins.plugins.ci.CIEnvironmentContributingAction;
import com.redhat.jenkins.plugins.ci.messaging.FedMsgMessagingProvider;
import com.redhat.jenkins.plugins.ci.messaging.JMSMessagingWorker;
import com.redhat.jenkins.plugins.ci.messaging.MessagingProviderOverrides;
import com.redhat.jenkins.plugins.ci.messaging.checks.MsgCheck;
import com.redhat.jenkins.plugins.ci.messaging.data.FedmsgMessage;
import com.redhat.jenkins.plugins.ci.messaging.data.SendResult;
import com.redhat.jenkins.plugins.ci.provider.data.FedMsgPublisherProviderData;
import com.redhat.jenkins.plugins.ci.provider.data.FedMsgSubscriberProviderData;
import com.redhat.jenkins.plugins.ci.provider.data.ProviderData;
import com.redhat.utils.PluginUtils;
import hudson.EnvVars;
import hudson.model.Action;
import hudson.model.Result;
import hudson.model.Run;
import hudson.model.TaskListener;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.StringUtils;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

public class FedMsgMessagingWorker
extends JMSMessagingWorker {
    private static final Logger log = Logger.getLogger(FedMsgMessagingWorker.class.getName());
    private final FedMsgMessagingProvider provider;
    public static final String DEFAULT_PREFIX = "org.fedoraproject";
    private ZMQ.Context context;
    private ZMQ.Poller poller;
    private ZMQ.Socket socket;
    private boolean interrupt = false;
    private boolean pollerClosed = false;

    public FedMsgMessagingWorker(FedMsgMessagingProvider fedMsgMessagingProvider, MessagingProviderOverrides overrides, String jobname) {
        this.provider = fedMsgMessagingProvider;
        this.overrides = overrides;
        this.jobname = jobname;
    }

    @Override
    public boolean subscribe(String jobname, String selector) {
        if (this.interrupt) {
            return true;
        }
        this.topic = this.getTopic(this.provider);
        if (this.topic != null) {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    if (!this.isConnected() && !this.connect()) {
                        return false;
                    }
                    if (this.socket == null) {
                        this.socket = this.context.socket(2);
                        log.info("Subscribing job '" + jobname + "' to " + this.topic + " topic.");
                        this.socket.subscribe(this.topic.getBytes());
                        this.socket.setLinger(0L);
                        this.socket.connect(this.provider.getHubAddr());
                        this.poller.register(this.socket, 1);
                        log.info("Successfully subscribed job '" + jobname + "' to topic '" + this.topic + "'.");
                    } else {
                        log.info("Already subscribed job '" + jobname + "' to topic '" + this.topic + "'.");
                    }
                    return true;
                }
                catch (Exception ex) {
                    log.log(Level.SEVERE, "Eexception raised while subscribing job '" + jobname + "', retrying in " + RETRY_MINUTES + " minutes.", ex);
                    if (Thread.currentThread().isInterrupted()) continue;
                    this.unsubscribe(jobname);
                    try {
                        Thread.sleep(RETRY_MINUTES * 60 * 1000);
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
        return false;
    }

    @Override
    public void unsubscribe(String jobname) {
        if (this.interrupt) {
            log.info("We are being interrupted. Skipping unsubscribe...");
            return;
        }
        try {
            if (this.poller != null) {
                Integer i = 0;
                while (i < this.poller.getSize()) {
                    ZMQ.Socket s = this.poller.getSocket(i.intValue());
                    this.poller.unregister(s);
                    s.disconnect(this.provider.getHubAddr());
                    log.info("Un-subscribing job '" + jobname + "' from " + this.topic + " topic.");
                    this.socket.unsubscribe(this.topic.getBytes());
                    Integer n = i;
                    Integer n2 = i = Integer.valueOf(i + 1);
                }
                this.socket.close();
            }
            if (this.context != null) {
                this.context.term();
            }
        }
        catch (Exception e) {
            log.warning(e.getMessage());
        }
        this.poller = null;
        this.context = null;
        this.socket = null;
        this.pollerClosed = true;
    }

    @Override
    public void receive(String jobname, ProviderData pdata) {
        block9: {
            FedMsgSubscriberProviderData pd = (FedMsgSubscriberProviderData)pdata;
            int timeoutInMs = (pd.getTimeout() != null ? pd.getTimeout() : FedMsgSubscriberProviderData.DEFAULT_TIMEOUT_IN_MINUTES) * 60 * 1000;
            if (this.interrupt) {
                log.info("we have been interrupted at start of receive");
                return;
            }
            while (!this.subscribe(jobname)) {
                if (Thread.currentThread().isInterrupted()) continue;
                try {
                    int WAIT_SECONDS = 2;
                    Thread.sleep(WAIT_SECONDS * 1000);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            ObjectMapper mapper = new ObjectMapper();
            mapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
            long lastSeenMessage = new Date().getTime();
            try {
                while (new Date().getTime() - lastSeenMessage < (long)timeoutInMs) {
                    if (this.poller.poll(1000L) > 0) {
                        this.pollerClosed = false;
                        if (!this.poller.pollin(0)) continue;
                        ZMsg z = ZMsg.recvMsg((ZMQ.Socket)this.poller.getSocket(0));
                        lastSeenMessage = new Date().getTime();
                        String json = z.getLast().toString();
                        FedmsgMessage data = (FedmsgMessage)mapper.readValue(json, FedmsgMessage.class);
                        if (!this.provider.verify(data.getBodyJson(), pd.getChecks())) continue;
                        HashMap<String, String> params = new HashMap<String, String>();
                        params.put("CI_MESSAGE", data.getBodyJson());
                        this.trigger(jobname, this.provider.formatMessage(data), params);
                        continue;
                    }
                    if (!this.interrupt) continue;
                    log.info("We have been interrupted...");
                    this.pollerClosed = true;
                    break;
                }
                if (!this.interrupt) {
                    log.info("No message received for the past " + timeoutInMs + " ms, re-subscribing for job '" + jobname + "'.");
                    this.unsubscribe(jobname);
                }
            }
            catch (Exception e) {
                if (Thread.currentThread().isInterrupted()) break block9;
                log.log(Level.WARNING, "JMS exception raised, going to re-subscribe for job '" + jobname + "'.", e);
                this.unsubscribe(jobname);
            }
        }
    }

    @Override
    public boolean connect() {
        this.context = ZMQ.context((int)1);
        this.poller = this.context.poller(1);
        return true;
    }

    @Override
    public boolean isConnected() {
        return this.poller != null;
    }

    @Override
    public boolean isConnectedAndSubscribed() {
        return this.isConnected();
    }

    @Override
    public void disconnect() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SendResult sendMessage(Run<?, ?> build, TaskListener listener, ProviderData pdata) {
        FedMsgPublisherProviderData pd = (FedMsgPublisherProviderData)pdata;
        ZMQ.Context context = ZMQ.context((int)1);
        ZMQ.Socket sock = context.socket(1);
        sock.setLinger(0L);
        log.fine("pub address: " + this.provider.getPubAddr());
        sock.connect(this.provider.getPubAddr());
        try {
            Thread.sleep(5000L);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        String body = "";
        String msgId = "";
        try {
            EnvVars env = new EnvVars();
            env.putAll((Map)build.getEnvironment(listener));
            env.put("CI_NAME", build.getParent().getName());
            if (!build.isBuilding()) {
                env.put("CI_STATUS", build.getResult() == Result.SUCCESS ? "passed" : "failed");
                env.put("BUILD_STATUS", build.getResult().toString());
            }
            FedmsgMessage fm = new FedmsgMessage(PluginUtils.getSubstitutedValue(this.getTopic(this.provider), build.getEnvironment(listener)), PluginUtils.getSubstitutedValue(pd.getMessageContent(), env));
            body = fm.getBodyJson();
            msgId = fm.getMsgId();
            if (!sock.sendMore(fm.getTopic()) && pd.isFailOnError().booleanValue()) {
                log.severe("Unhandled exception in perform: Failed to send message (topic)!");
                SendResult sendResult = new SendResult(false, msgId, body);
                return sendResult;
            }
            if (!sock.send(body) && pd.isFailOnError().booleanValue()) {
                log.severe("Unhandled exception in perform: Failed to send message (body)!");
                SendResult sendResult = new SendResult(false, msgId, body);
                return sendResult;
            }
            log.fine("JSON message body:\n" + body);
            listener.getLogger().println("JSON message body:\n" + body);
        }
        catch (Exception e) {
            SendResult sendResult;
            if (pd.isFailOnError().booleanValue()) {
                log.severe("Unhandled exception in perform: ");
                log.severe(ExceptionUtils.getStackTrace((Throwable)e));
                listener.fatalError("Unhandled exception in perform: ");
                listener.fatalError(ExceptionUtils.getStackTrace((Throwable)e));
                sendResult = new SendResult(false, msgId, body);
                return sendResult;
            }
            log.warning("Unhandled exception in perform: ");
            log.warning(ExceptionUtils.getStackTrace((Throwable)e));
            listener.error("Unhandled exception in perform: ");
            listener.error(ExceptionUtils.getStackTrace((Throwable)e));
            sendResult = new SendResult(true, msgId, body);
            return sendResult;
        }
        finally {
            sock.close();
            context.term();
        }
        return new SendResult(true, msgId, body);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public String waitForMessage(Run<?, ?> build, TaskListener listener, ProviderData pdata) {
        FedMsgSubscriberProviderData pd = (FedMsgSubscriberProviderData)pdata;
        log.info("Waiting for message.");
        listener.getLogger().println("Waiting for message.");
        for (MsgCheck msgCheck : pd.getChecks()) {
            log.info(" with check: " + msgCheck.toString());
            listener.getLogger().println(" with check: " + msgCheck.toString());
        }
        Integer timeout = pd.getTimeout() != null ? pd.getTimeout() : FedMsgSubscriberProviderData.DEFAULT_TIMEOUT_IN_MINUTES;
        log.info(" with timeout: " + timeout);
        listener.getLogger().println(" with timeout: " + timeout);
        ZMQ.Context lcontext = ZMQ.context((int)1);
        ZMQ.Poller lpoller = lcontext.poller(1);
        ZMQ.Socket lsocket = lcontext.socket(2);
        String ltopic = this.getTopic(this.provider);
        try {
            ltopic = PluginUtils.getSubstitutedValue(this.getTopic(this.provider), build.getEnvironment(listener));
        }
        catch (IOException e) {
            log.warning(e.getMessage());
        }
        catch (InterruptedException e) {
            log.warning(e.getMessage());
        }
        lsocket.subscribe(ltopic.getBytes());
        lsocket.setLinger(0L);
        lsocket.connect(this.provider.getHubAddr());
        lpoller.register(lsocket, 1);
        ObjectMapper mapper = new ObjectMapper();
        long startTime = new Date().getTime();
        int timeoutInMs = timeout * 60 * 1000;
        boolean interrupted = false;
        try {
            ZMsg z;
            while (new Date().getTime() - startTime < (long)timeoutInMs) {
                if (lpoller.poll(1000L) <= 0 || !lpoller.pollin(0)) continue;
                z = ZMsg.recvMsg((ZMQ.Socket)lpoller.getSocket(0));
                listener.getLogger().println("Received a message");
                String json = z.getLast().toString();
                FedmsgMessage data = (FedmsgMessage)mapper.readValue(json, FedmsgMessage.class);
                String body = data.getBodyJson();
                if (!this.provider.verify(body, pd.getChecks())) continue;
                if (build != null && StringUtils.isNotEmpty((CharSequence)pd.getVariable())) {
                    EnvVars vars = new EnvVars();
                    vars.put(pd.getVariable(), body);
                    build.addAction((Action)new CIEnvironmentContributingAction((Map<String, String>)vars));
                }
                String string = body;
                return string;
            }
            if (interrupted) {
                z = null;
                return z;
            }
            log.severe("Timed out waiting for message!");
            listener.getLogger().println("Timed out waiting for message!");
        }
        catch (Exception e) {
            log.log(Level.SEVERE, "Unhandled exception waiting for message.", e);
        }
        finally {
            try {
                ZMQ.Socket s = lpoller.getSocket(0);
                lpoller.unregister(s);
                s.disconnect(this.provider.getHubAddr());
                lsocket.unsubscribe(ltopic.getBytes());
                lsocket.close();
                lcontext.term();
            }
            catch (Exception e) {
                listener.getLogger().println("exception in finally");
            }
        }
        return null;
    }

    @Override
    public void prepareForInterrupt() {
        this.interrupt = true;
        try {
            while (!this.pollerClosed) {
                if (!Thread.currentThread().isAlive()) {
                    log.info("poller not closed yet BUT trigger thread is dead. continuing interrupt");
                    break;
                }
                try {
                    log.info("poller not closed yet. Sleeping for 1 sec...");
                    Thread.sleep(1000L);
                }
                catch (InterruptedException interruptedException) {}
            }
            if (this.poller != null) {
                ZMQ.Socket s = this.poller.getSocket(0);
                this.poller.unregister(s);
                s.disconnect(this.provider.getHubAddr());
                log.info("Un-subscribing job '" + this.jobname + "' from " + this.topic + " topic.");
                this.socket.unsubscribe(this.topic.getBytes());
                this.socket.close();
            }
            if (this.context != null) {
                this.context.term();
            }
        }
        catch (Exception e) {
            log.fine(e.getMessage());
        }
        this.poller = null;
        this.socket = null;
        this.interrupt = false;
    }

    @Override
    public boolean isBeingInterrupted() {
        return this.interrupt;
    }
}

