/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.function.BiConsumerEx;
import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.processor.SinkProcessors;
import com.hazelcast.jet.impl.util.Util;
import java.io.Serializable;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;

public final class WriteJmsP {
    private static final int PREFERRED_LOCAL_PARALLELISM = 4;

    private WriteJmsP() {
    }

    public static <T> ProcessorMetaSupplier supplier(String destinationName, SupplierEx<? extends Connection> newConnectionFn, BiFunctionEx<? super Session, T, ? extends Message> messageFn, boolean isTopic) {
        Util.checkSerializable(newConnectionFn, "newConnectionFn");
        Util.checkSerializable(messageFn, "messageFn");
        return ProcessorMetaSupplier.of(4, new Supplier(destinationName, newConnectionFn, messageFn, isTopic));
    }

    private static final class Supplier<T>
    implements ProcessorSupplier {
        static final long serialVersionUID = 1L;
        private final SupplierEx<? extends Connection> newConnectionFn;
        private final String name;
        private final boolean isTopic;
        private final BiFunctionEx<? super Session, ? super T, ? extends Message> messageFn;
        private transient Connection connection;

        private Supplier(String destinationName, SupplierEx<? extends Connection> newConnectionFn, BiFunctionEx<? super Session, ? super T, ? extends Message> messageFn, boolean isTopic) {
            this.newConnectionFn = newConnectionFn;
            this.messageFn = messageFn;
            this.name = destinationName;
            this.isTopic = isTopic;
        }

        @Override
        public void init(@Nonnull ProcessorSupplier.Context ignored) throws Exception {
            this.connection = (Connection)this.newConnectionFn.get();
            this.connection.start();
        }

        @Override
        @Nonnull
        public Collection<? extends Processor> get(int count) {
            FunctionEx & Serializable createFn = (FunctionEx & Serializable)jet -> {
                Session session = this.connection.createSession(true, 0);
                Topic destination = this.isTopic ? session.createTopic(this.name) : session.createQueue(this.name);
                MessageProducer producer = session.createProducer((Destination)destination);
                return new JmsContext(session, producer);
            };
            BiConsumerEx & Serializable onReceiveFn = (BiConsumerEx & Serializable)(jmsContext, item) -> {
                Message message = (Message)this.messageFn.apply((Object)((JmsContext)jmsContext).session, item);
                ((JmsContext)jmsContext).producer.send(message);
            };
            SupplierEx<Processor> supplier = SinkProcessors.writeBufferedP(createFn, onReceiveFn, JmsContext::commit, JmsContext::close);
            return Stream.generate(supplier).limit(count).collect(Collectors.toList());
        }

        @Override
        public void close(Throwable error) throws Exception {
            if (this.connection != null) {
                this.connection.close();
            }
        }

        static class JmsContext {
            private final Session session;
            private final MessageProducer producer;

            JmsContext(Session session, MessageProducer producer) {
                this.session = session;
                this.producer = producer;
            }

            public void commit() throws JMSException {
                this.session.commit();
            }

            public void close() throws JMSException {
                this.producer.close();
                this.session.close();
            }
        }
    }
}

