/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.core.processor;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.function.DistributedBiConsumer;
import com.hazelcast.jet.function.DistributedBiFunction;
import com.hazelcast.jet.function.DistributedBinaryOperator;
import com.hazelcast.jet.function.DistributedConsumer;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedFunctions;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.impl.connector.HazelcastWriters;
import com.hazelcast.jet.impl.connector.WriteBufferedP;
import com.hazelcast.jet.impl.connector.WriteFileP;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.map.EntryProcessor;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import javax.annotation.Nonnull;

public final class SinkProcessors {
    private SinkProcessors() {
    }

    @Nonnull
    public static ProcessorMetaSupplier writeMapP(@Nonnull String mapName) {
        return HazelcastWriters.writeMapP(mapName, null);
    }

    @Nonnull
    public static ProcessorMetaSupplier writeRemoteMapP(@Nonnull String mapName, @Nonnull ClientConfig clientConfig) {
        return HazelcastWriters.writeMapP(mapName, clientConfig);
    }

    @Nonnull
    public static <E, K, V> ProcessorMetaSupplier mergeMapP(@Nonnull String mapName, @Nonnull DistributedFunction<E, K> toKeyFn, @Nonnull DistributedFunction<E, V> toValueFn, @Nonnull DistributedBinaryOperator<V> mergeFn) {
        return HazelcastWriters.mergeMapP(mapName, null, toKeyFn, toValueFn, mergeFn);
    }

    @Nonnull
    public static <E, K, V> ProcessorMetaSupplier mergeRemoteMapP(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull DistributedFunction<E, K> toKeyFn, @Nonnull DistributedFunction<E, V> toValueFn, @Nonnull DistributedBinaryOperator<V> mergeFn) {
        return HazelcastWriters.mergeMapP(mapName, clientConfig, toKeyFn, toValueFn, mergeFn);
    }

    @Nonnull
    public static <E, K, V> ProcessorMetaSupplier updateMapP(@Nonnull String mapName, @Nonnull DistributedFunction<E, K> toKeyFn, @Nonnull DistributedBiFunction<V, E, V> updateFn) {
        return HazelcastWriters.updateMapP(mapName, null, toKeyFn, updateFn);
    }

    @Nonnull
    public static <E, K, V> ProcessorMetaSupplier updateRemoteMapP(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull DistributedFunction<E, K> toKeyFn, @Nonnull DistributedBiFunction<V, E, V> updateFn) {
        return HazelcastWriters.updateMapP(mapName, clientConfig, toKeyFn, updateFn);
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier updateMapP(@Nonnull String mapName, @Nonnull DistributedFunction<T, K> toKeyFn, @Nonnull DistributedFunction<T, EntryProcessor<K, V>> toEntryProcessorFn) {
        return HazelcastWriters.updateMapP(mapName, null, toKeyFn, toEntryProcessorFn);
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier updateRemoteMapP(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull DistributedFunction<T, K> toKeyFn, @Nonnull DistributedFunction<T, EntryProcessor<K, V>> toEntryProcessorFn) {
        return HazelcastWriters.updateMapP(mapName, clientConfig, toKeyFn, toEntryProcessorFn);
    }

    @Nonnull
    public static ProcessorMetaSupplier writeCacheP(@Nonnull String cacheName) {
        return HazelcastWriters.writeCacheP(cacheName, null);
    }

    @Nonnull
    public static ProcessorMetaSupplier writeRemoteCacheP(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig) {
        return HazelcastWriters.writeCacheP(cacheName, clientConfig);
    }

    @Nonnull
    public static ProcessorMetaSupplier writeListP(@Nonnull String listName) {
        return HazelcastWriters.writeListP(listName, null);
    }

    @Nonnull
    public static ProcessorMetaSupplier writeRemoteListP(@Nonnull String listName, @Nonnull ClientConfig clientConfig) {
        return HazelcastWriters.writeListP(listName, clientConfig);
    }

    public static <T> ProcessorMetaSupplier writeSocketP(@Nonnull String host, int port, @Nonnull DistributedFunction<T, String> toStringFn, @Nonnull Charset charset) {
        String charsetName = charset.name();
        return ProcessorMetaSupplier.preferLocalParallelismOne(SinkProcessors.writeBufferedP(index -> Util.uncheckCall(() -> new BufferedWriter(new OutputStreamWriter(new Socket(host, port).getOutputStream(), charsetName))), (bufferedWriter, item) -> {
            try {
                bufferedWriter.write((String)toStringFn.apply(item));
                bufferedWriter.write(10);
            }
            catch (IOException e) {
                throw ExceptionUtil.sneakyThrow(e);
            }
        }, bufferedWriter -> Util.uncheckRun(bufferedWriter::flush), bufferedWriter -> Util.uncheckRun(bufferedWriter::close)));
    }

    @Nonnull
    public static <T> ProcessorMetaSupplier writeFileP(@Nonnull String directoryName, @Nonnull DistributedFunction<T, String> toStringFn, @Nonnull Charset charset, boolean append) {
        return WriteFileP.metaSupplier(directoryName, toStringFn, charset.name(), append);
    }

    @Nonnull
    public static <T> ProcessorMetaSupplier writeFileP(@Nonnull String directoryName, @Nonnull DistributedFunction<T, String> toStringFn) {
        return SinkProcessors.writeFileP(directoryName, toStringFn, StandardCharsets.UTF_8, false);
    }

    @Nonnull
    public static ProcessorMetaSupplier writeFileP(@Nonnull String directoryName) {
        return SinkProcessors.writeFileP(directoryName, Object::toString, StandardCharsets.UTF_8, false);
    }

    @Nonnull
    public static <B, T> DistributedSupplier<Processor> writeBufferedP(@Nonnull DistributedFunction<Processor.Context, B> newBufferFn, @Nonnull DistributedBiConsumer<B, T> addToBufferFn, @Nonnull DistributedConsumer<B> flushBufferFn) {
        return SinkProcessors.writeBufferedP(newBufferFn, addToBufferFn, flushBufferFn, DistributedFunctions.noopConsumer());
    }

    @Nonnull
    public static <W, T> DistributedSupplier<Processor> writeBufferedP(@Nonnull DistributedFunction<? super Processor.Context, ? extends W> createFn, @Nonnull DistributedBiConsumer<? super W, ? super T> onReceiveFn, @Nonnull DistributedConsumer<? super W> flushFn, @Nonnull DistributedConsumer<? super W> destroyFn) {
        return WriteBufferedP.supplier(createFn, onReceiveFn, flushFn, destroyFn);
    }
}

