/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.pipeline;

import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.function.DistributedConsumer;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.impl.pipeline.transform.StreamSourceTransform;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.util.Preconditions;
import javax.annotation.Nonnull;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;

public final class JmsSourceBuilder {
    private final DistributedSupplier<? extends ConnectionFactory> factorySupplier;
    private final boolean isTopic;
    private DistributedFunction<? super ConnectionFactory, ? extends Connection> connectionFn;
    private DistributedFunction<? super Connection, ? extends Session> sessionFn;
    private DistributedFunction<? super Session, ? extends MessageConsumer> consumerFn;
    private DistributedConsumer<? super Session> flushFn;
    private String username;
    private String password;
    private boolean transacted;
    private int acknowledgeMode = 1;
    private String destinationName;

    JmsSourceBuilder(DistributedSupplier<? extends ConnectionFactory> factorySupplier, boolean isTopic) {
        Util.checkSerializable(factorySupplier, "factorySupplier");
        this.factorySupplier = factorySupplier;
        this.isTopic = isTopic;
    }

    public JmsSourceBuilder connectionParams(String username, String password) {
        this.username = username;
        this.password = password;
        return this;
    }

    public JmsSourceBuilder connectionFn(@Nonnull DistributedFunction<? super ConnectionFactory, ? extends Connection> connectionFn) {
        Util.checkSerializable(connectionFn, "connectionFn");
        this.connectionFn = connectionFn;
        return this;
    }

    public JmsSourceBuilder sessionParams(boolean transacted, int acknowledgeMode) {
        this.transacted = transacted;
        this.acknowledgeMode = acknowledgeMode;
        return this;
    }

    public JmsSourceBuilder sessionFn(@Nonnull DistributedFunction<? super Connection, ? extends Session> sessionFn) {
        Util.checkSerializable(sessionFn, "sessionFn");
        this.sessionFn = sessionFn;
        return this;
    }

    public JmsSourceBuilder destinationName(String destinationName) {
        this.destinationName = destinationName;
        return this;
    }

    public JmsSourceBuilder consumerFn(@Nonnull DistributedFunction<? super Session, ? extends MessageConsumer> consumerFn) {
        Util.checkSerializable(consumerFn, "consumerFn");
        this.consumerFn = consumerFn;
        return this;
    }

    public JmsSourceBuilder flushFn(@Nonnull DistributedConsumer<? super Session> flushFn) {
        Util.checkSerializable(flushFn, "flushFn");
        this.flushFn = flushFn;
        return this;
    }

    public <T> StreamSource<T> build(@Nonnull DistributedFunction<? super Message, ? extends T> projectionFn) {
        String usernameLocal = this.username;
        String passwordLocal = this.password;
        boolean transactedLocal = this.transacted;
        int acknowledgeModeLocal = this.acknowledgeMode;
        String nameLocal = this.destinationName;
        boolean isTopicLocal = this.isTopic;
        if (this.connectionFn == null) {
            this.connectionFn = factory -> factory.createConnection(usernameLocal, passwordLocal);
        }
        if (this.sessionFn == null) {
            this.sessionFn = connection -> connection.createSession(transactedLocal, acknowledgeModeLocal);
        }
        if (this.consumerFn == null) {
            Preconditions.checkNotNull((Object)nameLocal);
            this.consumerFn = session -> session.createConsumer((Destination)(isTopicLocal ? session.createTopic(nameLocal) : session.createQueue(nameLocal)));
        }
        if (this.flushFn == null) {
            this.flushFn = DistributedConsumer.noop();
        }
        DistributedFunction<? super ConnectionFactory, ? extends Connection> connectionFnLocal = this.connectionFn;
        DistributedSupplier<? extends ConnectionFactory> factorySupplierLocal = this.factorySupplier;
        DistributedSupplier<Connection> connectionSupplier = () -> (Connection)connectionFnLocal.apply((ConnectionFactory)factorySupplierLocal.get());
        ProcessorMetaSupplier metaSupplier = this.isTopic ? SourceProcessors.streamJmsTopicP(connectionSupplier, this.sessionFn, this.consumerFn, this.flushFn, projectionFn) : SourceProcessors.streamJmsQueueP(connectionSupplier, this.sessionFn, this.consumerFn, this.flushFn, projectionFn);
        return new StreamSourceTransform(this.sourceName(), w -> metaSupplier, false);
    }

    public StreamSource<Message> build() {
        return this.build(message -> message);
    }

    private String sourceName() {
        return (this.isTopic ? "jmsTopicSource(" : "jmsQueueSource(") + (this.destinationName == null ? "?" : this.destinationName) + ')';
    }
}

