/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.core.processor;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.function.DistributedBiConsumer;
import com.hazelcast.jet.function.DistributedBiFunction;
import com.hazelcast.jet.function.DistributedBinaryOperator;
import com.hazelcast.jet.function.DistributedConsumer;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.impl.connector.HazelcastWriters;
import com.hazelcast.jet.impl.connector.WriteBufferedP;
import com.hazelcast.jet.impl.connector.WriteFileP;
import com.hazelcast.jet.impl.connector.WriteJdbcP;
import com.hazelcast.jet.impl.connector.WriteJmsP;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.map.EntryProcessor;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.nio.charset.Charset;
import java.sql.PreparedStatement;
import javax.annotation.Nonnull;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

public final class SinkProcessors {
    private SinkProcessors() {
    }

    @Nonnull
    public static ProcessorMetaSupplier writeMapP(@Nonnull String mapName) {
        return HazelcastWriters.writeMapSupplier(mapName, null);
    }

    @Nonnull
    public static ProcessorMetaSupplier writeRemoteMapP(@Nonnull String mapName, @Nonnull ClientConfig clientConfig) {
        return HazelcastWriters.writeMapSupplier(mapName, clientConfig);
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier mergeMapP(@Nonnull String mapName, @Nonnull DistributedFunction<? super T, ? extends K> toKeyFn, @Nonnull DistributedFunction<? super T, ? extends V> toValueFn, @Nonnull DistributedBinaryOperator<V> mergeFn) {
        return HazelcastWriters.mergeMapSupplier(mapName, null, toKeyFn, toValueFn, mergeFn);
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier mergeRemoteMapP(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull DistributedFunction<? super T, ? extends K> toKeyFn, @Nonnull DistributedFunction<? super T, ? extends V> toValueFn, @Nonnull DistributedBinaryOperator<V> mergeFn) {
        return HazelcastWriters.mergeMapSupplier(mapName, clientConfig, toKeyFn, toValueFn, mergeFn);
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier updateMapP(@Nonnull String mapName, @Nonnull DistributedFunction<? super T, ? extends K> toKeyFn, @Nonnull DistributedBiFunction<? super V, ? super T, ? extends V> updateFn) {
        return HazelcastWriters.updateMapSupplier(mapName, null, toKeyFn, updateFn);
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier updateRemoteMapP(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull DistributedFunction<? super T, ? extends K> toKeyFn, @Nonnull DistributedBiFunction<? super V, ? super T, ? extends V> updateFn) {
        return HazelcastWriters.updateMapSupplier(mapName, clientConfig, toKeyFn, updateFn);
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier updateMapP(@Nonnull String mapName, @Nonnull DistributedFunction<? super T, ? extends K> toKeyFn, @Nonnull DistributedFunction<? super T, ? extends EntryProcessor<K, V>> toEntryProcessorFn) {
        return HazelcastWriters.updateMapSupplier(mapName, null, toKeyFn, toEntryProcessorFn);
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier updateRemoteMapP(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull DistributedFunction<? super T, ? extends K> toKeyFn, @Nonnull DistributedFunction<? super T, ? extends EntryProcessor<K, V>> toEntryProcessorFn) {
        return HazelcastWriters.updateMapSupplier(mapName, clientConfig, toKeyFn, toEntryProcessorFn);
    }

    @Nonnull
    public static ProcessorMetaSupplier writeCacheP(@Nonnull String cacheName) {
        return HazelcastWriters.writeCacheSupplier(cacheName, null);
    }

    @Nonnull
    public static ProcessorMetaSupplier writeRemoteCacheP(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig) {
        return HazelcastWriters.writeCacheSupplier(cacheName, clientConfig);
    }

    @Nonnull
    public static ProcessorMetaSupplier writeListP(@Nonnull String listName) {
        return HazelcastWriters.writeListSupplier(listName, null);
    }

    @Nonnull
    public static ProcessorMetaSupplier writeRemoteListP(@Nonnull String listName, @Nonnull ClientConfig clientConfig) {
        return HazelcastWriters.writeListSupplier(listName, clientConfig);
    }

    public static <T> ProcessorMetaSupplier writeSocketP(@Nonnull String host, int port, @Nonnull DistributedFunction<? super T, ? extends String> toStringFn, @Nonnull Charset charset) {
        Util.checkSerializable(toStringFn, "toStringFn");
        String charsetName = charset.name();
        return ProcessorMetaSupplier.preferLocalParallelismOne(SinkProcessors.writeBufferedP(index -> new BufferedWriter(new OutputStreamWriter(new Socket(host, port).getOutputStream(), charsetName)), (bufferedWriter, item) -> {
            Object t = item;
            bufferedWriter.write((String)toStringFn.apply(t));
            bufferedWriter.write(10);
        }, BufferedWriter::flush, BufferedWriter::close));
    }

    @Nonnull
    public static <T> ProcessorMetaSupplier writeFileP(@Nonnull String directoryName, @Nonnull DistributedFunction<? super T, ? extends String> toStringFn, @Nonnull Charset charset, boolean append) {
        Util.checkSerializable(toStringFn, "toStringFn");
        return WriteFileP.metaSupplier(directoryName, toStringFn, charset.name(), append);
    }

    @Nonnull
    public static <W, T> DistributedSupplier<Processor> writeBufferedP(@Nonnull DistributedFunction<? super Processor.Context, ? extends W> createFn, @Nonnull DistributedBiConsumer<? super W, ? super T> onReceiveFn, @Nonnull DistributedConsumer<? super W> flushFn) {
        return SinkProcessors.writeBufferedP(createFn, onReceiveFn, flushFn, DistributedConsumer.noop());
    }

    @Nonnull
    public static <W, T> DistributedSupplier<Processor> writeBufferedP(@Nonnull DistributedFunction<? super Processor.Context, ? extends W> createFn, @Nonnull DistributedBiConsumer<? super W, ? super T> onReceiveFn, @Nonnull DistributedConsumer<? super W> flushFn, @Nonnull DistributedConsumer<? super W> destroyFn) {
        return WriteBufferedP.supplier(createFn, onReceiveFn, flushFn, destroyFn);
    }

    @Nonnull
    public static <T> ProcessorMetaSupplier writeJmsQueueP(@Nonnull DistributedSupplier<? extends Connection> connectionSupplier, @Nonnull DistributedFunction<? super Connection, ? extends Session> sessionF, @Nonnull DistributedBiFunction<? super Session, ? super T, ? extends Message> messageFn, @Nonnull DistributedBiConsumer<? super MessageProducer, ? super Message> sendFn, @Nonnull DistributedConsumer<? super Session> flushFn, @Nonnull String name) {
        return WriteJmsP.supplier(connectionSupplier, sessionF, messageFn, sendFn, flushFn, name, false);
    }

    @Nonnull
    public static <T> ProcessorMetaSupplier writeJmsTopicP(@Nonnull DistributedSupplier<? extends Connection> connectionSupplier, @Nonnull DistributedFunction<? super Connection, ? extends Session> sessionF, @Nonnull DistributedBiFunction<? super Session, ? super T, ? extends Message> messageFn, @Nonnull DistributedBiConsumer<? super MessageProducer, ? super Message> sendFn, @Nonnull DistributedConsumer<? super Session> flushFn, @Nonnull String name) {
        return WriteJmsP.supplier(connectionSupplier, sessionF, messageFn, sendFn, flushFn, name, true);
    }

    @Nonnull
    public static <T> ProcessorMetaSupplier writeJdbcP(@Nonnull String updateQuery, @Nonnull DistributedSupplier<? extends java.sql.Connection> connectionSupplier, @Nonnull DistributedBiConsumer<? super PreparedStatement, ? super T> bindFn) {
        Util.checkSerializable(connectionSupplier, "connectionSupplier");
        Util.checkSerializable(bindFn, "bindFn");
        return WriteJdbcP.metaSupplier(updateQuery, connectionSupplier, bindFn);
    }
}

