/*
 * Decompiled with CFR 0.152.
 */
package fish.payara.cloud.connectors.kafka.inbound;

import fish.payara.cloud.connectors.kafka.api.OnRecord;
import fish.payara.cloud.connectors.kafka.api.OnRecords;
import fish.payara.cloud.connectors.kafka.inbound.EndpointKey;
import fish.payara.cloud.connectors.kafka.inbound.KafkaResourceAdapter;
import fish.payara.cloud.connectors.kafka.inbound.KafkaSynchWorker;
import fish.payara.cloud.connectors.kafka.inbound.KafkaWorker;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.resource.ResourceException;
import javax.resource.spi.UnavailableException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkManager;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaAsynchWorker
implements KafkaWorker {
    private static final Logger LOGGER = Logger.getLogger(KafkaAsynchWorker.class.getName());
    private final EndpointKey key;
    private final KafkaConsumer consumer;
    private final WorkManager wm;
    private AtomicBoolean stopped = new AtomicBoolean();
    private final List<Method> onRecordMethods = new ArrayList<Method>();
    private final List<Method> onRecordsMethods = new ArrayList<Method>();
    private final long startTime = new Date().getTime();
    private AtomicBoolean started = new AtomicBoolean();

    public KafkaAsynchWorker(EndpointKey key, WorkManager wm) {
        this.key = key;
        this.wm = wm;
        this.stopped.set(false);
        Class mdbClass = key.getMef().getEndpointClass();
        for (Method m : mdbClass.getMethods()) {
            if (m.isAnnotationPresent(OnRecord.class)) {
                if (m.getParameterCount() == 1 && ConsumerRecord.class.isAssignableFrom(m.getParameterTypes()[0])) {
                    this.onRecordMethods.add(m);
                } else {
                    LOGGER.log(Level.WARNING, "@{0} annotated MDBs must have only one parameter of type {1}. {2}#{3} endpoint will be ignored.", new Object[]{OnRecord.class.getSimpleName(), ConsumerRecord.class.getSimpleName(), mdbClass.getName(), m.getName()});
                }
            }
            if (!m.isAnnotationPresent(OnRecords.class)) continue;
            if (m.getParameterCount() == 1 && ConsumerRecords.class.isAssignableFrom(m.getParameterTypes()[0])) {
                this.onRecordsMethods.add(m);
                continue;
            }
            LOGGER.log(Level.WARNING, "@{0} annotated MDBs must have only one parameter of type {1}. {2}#{3} endpoint will be ignored.", new Object[]{OnRecords.class.getSimpleName(), ConsumerRecords.class.getSimpleName(), mdbClass.getName(), m.getName()});
        }
        this.consumer = new KafkaConsumer(key.getSpec().getConsumerProperties());
        this.consumer.subscribe(Arrays.asList(key.getSpec().getTopics().split(",")));
    }

    @Override
    public void stop() {
        this.stopped.set(true);
    }

    @Override
    public boolean isStopped() {
        return this.stopped.get();
    }

    private void checkStart() {
        long now = System.currentTimeMillis();
        if (now > this.key.getSpec().getInitialPollDelay() + this.startTime) {
            this.started.set(true);
        } else {
            try {
                Thread.sleep(this.key.getSpec().getInitialPollDelay() - (now - this.startTime));
                this.started.set(true);
            }
            catch (InterruptedException e) {
                LOGGER.log(Level.WARNING, "Interrupt during sleep while waiting for initial poll delay");
            }
        }
    }

    public void release() {
        this.stop();
    }

    public void run() {
        if (!this.started.get()) {
            this.checkStart();
        } else {
            ConsumerRecords records = this.consumer.poll(Duration.of(this.key.getSpec().getPollInterval(), ChronoUnit.MILLIS));
            if (!records.isEmpty()) {
                try {
                    this.wm.scheduleWork((Work)new RecordsProcessor((ConsumerRecords<Object, Object>)records));
                }
                catch (WorkException ex) {
                    Logger.getLogger(KafkaAsynchWorker.class.getName()).log(Level.SEVERE, null, ex);
                }
                if (this.key.getSpec().getCommitEachPoll().booleanValue()) {
                    this.consumer.commitSync();
                }
            }
            if (this.stopped.get()) {
                this.consumer.close();
            }
        }
    }

    private class RecordsProcessor
    implements Work {
        private ConsumerRecords<Object, Object> records;
        private MessageEndpoint endpoint;

        RecordsProcessor(ConsumerRecords<Object, Object> records) {
            this.records = records;
        }

        public void release() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            try {
                this.endpoint = KafkaAsynchWorker.this.key.getMef().createEndpoint(null);
                for (Method m : KafkaAsynchWorker.this.onRecordsMethods) {
                    OnRecords recordsAnnt = m.getAnnotation(OnRecords.class);
                    try {
                        this.deliverRecords(this.endpoint, m, this.records);
                    }
                    catch (UnavailableException ex) {
                        Logger.getLogger(KafkaSynchWorker.class.getName()).log(Level.SEVERE, null, ex);
                    }
                    if (recordsAnnt.matchOtherMethods()) continue;
                    return;
                }
                block11: for (ConsumerRecord record : this.records) {
                    for (Method m : KafkaAsynchWorker.this.onRecordMethods) {
                        OnRecord recordAnnt = m.getAnnotation(OnRecord.class);
                        String[] topics = recordAnnt.topics();
                        if (topics.length != 0 && Arrays.binarySearch(recordAnnt.topics(), record.topic()) < 0) continue;
                        try {
                            this.deliverRecord(this.endpoint, m, (ConsumerRecord<Object, Object>)record);
                        }
                        catch (UnavailableException ex) {
                            Logger.getLogger(KafkaSynchWorker.class.getName()).log(Level.SEVERE, null, ex);
                        }
                        if (recordAnnt.matchOtherMethods()) continue;
                        continue block11;
                    }
                }
            }
            catch (UnavailableException ex) {
                Logger.getLogger(KafkaAsynchWorker.class.getName()).log(Level.SEVERE, null, ex);
            }
            finally {
                this.endpoint.release();
            }
        }

        private void deliverRecords(MessageEndpoint endpoint, Method m, ConsumerRecords<Object, Object> records) throws UnavailableException {
            try {
                endpoint.beforeDelivery(m);
                m.invoke((Object)endpoint, records);
                endpoint.afterDelivery();
            }
            catch (IllegalAccessException | IllegalArgumentException | NoSuchMethodException | InvocationTargetException | ResourceException ex) {
                Logger.getLogger(KafkaResourceAdapter.class.getName()).log(Level.SEVERE, null, ex);
            }
        }

        private void deliverRecord(MessageEndpoint endpoint, Method m, ConsumerRecord<Object, Object> record) throws UnavailableException {
            try {
                endpoint.beforeDelivery(m);
                m.invoke((Object)endpoint, record);
                endpoint.afterDelivery();
            }
            catch (IllegalAccessException | IllegalArgumentException | NoSuchMethodException | InvocationTargetException | ResourceException ex) {
                Logger.getLogger(KafkaResourceAdapter.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}

