/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.cache.ICache;
import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.IList;
import com.hazelcast.core.IMap;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.function.DistributedBiConsumer;
import com.hazelcast.jet.function.DistributedConsumer;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedFunctions;
import com.hazelcast.jet.function.DistributedIntFunction;
import com.hazelcast.jet.impl.connector.SerializableClientConfig;
import com.hazelcast.jet.impl.connector.WriteBufferedP;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

public final class HazelcastWriters {
    private HazelcastWriters() {
    }

    @Nonnull
    public static ProcessorMetaSupplier writeMapP(String name, ClientConfig clientConfig) {
        boolean isLocal = clientConfig == null;
        return ProcessorMetaSupplier.dontParallelize(new HazelcastWriterSupplier(HazelcastWriters.serializableConfig(clientConfig), index -> new ArrayMap(), ArrayMap::add, instance -> {
            IMap map = instance.getMap(name);
            return buffer -> {
                try {
                    map.putAll(buffer);
                }
                catch (HazelcastInstanceNotActiveException e) {
                    HazelcastWriters.handleInstanceNotActive(instance, e, isLocal);
                }
                buffer.clear();
            };
        }, DistributedFunctions.noopConsumer()));
    }

    @Nonnull
    public static ProcessorMetaSupplier writeCacheP(String name, ClientConfig clientConfig) {
        boolean isLocal = clientConfig == null;
        return ProcessorMetaSupplier.dontParallelize(new HazelcastWriterSupplier(HazelcastWriters.serializableConfig(clientConfig), index -> new ArrayMap(), ArrayMap::add, CacheFlush.flushToCache(name, isLocal), DistributedFunctions.noopConsumer()));
    }

    @Nonnull
    public static ProcessorMetaSupplier writeListP(String name, ClientConfig clientConfig) {
        boolean isLocal = clientConfig == null;
        return ProcessorMetaSupplier.dontParallelize(new HazelcastWriterSupplier(HazelcastWriters.serializableConfig(clientConfig), index -> new ArrayList(), ArrayList::add, instance -> {
            IList list = instance.getList(name);
            return buffer -> {
                try {
                    list.addAll(buffer);
                }
                catch (HazelcastInstanceNotActiveException e) {
                    HazelcastWriters.handleInstanceNotActive(instance, e, isLocal);
                }
                buffer.clear();
            };
        }, DistributedFunctions.noopConsumer()));
    }

    private static void handleInstanceNotActive(HazelcastInstance instance, HazelcastInstanceNotActiveException e, boolean isLocal) {
        if (isLocal) {
            instance.getLoggingService().getLogger(HazelcastWriters.class).fine("Ignoring HazelcastInstanceNotActiveException from local cluster as the job will be restarted automatically.", e);
            return;
        }
        throw e;
    }

    private static SerializableClientConfig serializableConfig(ClientConfig clientConfig) {
        return clientConfig != null ? new SerializableClientConfig(clientConfig) : null;
    }

    private static class HazelcastWriterSupplier<B, T>
    implements ProcessorSupplier {
        static final long serialVersionUID = 1L;
        private final SerializableClientConfig clientConfig;
        private final DistributedFunction<HazelcastInstance, DistributedConsumer<B>> instanceToFlushBuffer;
        private final DistributedIntFunction<B> bufferSupplier;
        private final DistributedBiConsumer<B, T> addToBuffer;
        private final DistributedConsumer<B> disposeBuffer;
        private transient DistributedConsumer<B> flushBuffer;
        private transient HazelcastInstance client;

        HazelcastWriterSupplier(SerializableClientConfig clientConfig, DistributedIntFunction<B> newBuffer, DistributedBiConsumer<B, T> addToBuffer, DistributedFunction<HazelcastInstance, DistributedConsumer<B>> instanceToFlushBuffer, DistributedConsumer<B> disposeBuffer) {
            this.clientConfig = clientConfig;
            this.instanceToFlushBuffer = instanceToFlushBuffer;
            this.bufferSupplier = newBuffer;
            this.addToBuffer = addToBuffer;
            this.disposeBuffer = disposeBuffer;
        }

        @Override
        public void init(@Nonnull ProcessorSupplier.Context context) {
            HazelcastInstance instance = this.isRemote() ? (this.client = HazelcastClient.newHazelcastClient(this.clientConfig.asClientConfig())) : context.jetInstance().getHazelcastInstance();
            this.flushBuffer = (DistributedConsumer)this.instanceToFlushBuffer.apply(instance);
        }

        @Override
        public void complete(Throwable error) {
            if (this.client != null) {
                this.client.shutdown();
            }
        }

        private boolean isRemote() {
            return this.clientConfig != null;
        }

        @Nonnull
        public List<Processor> get(int count) {
            return Stream.generate(() -> new WriteBufferedP<B, T>(this.bufferSupplier, this.addToBuffer, this.flushBuffer, this.disposeBuffer)).limit(count).collect(Collectors.toList());
        }
    }

    private static final class ArrayMap
    extends AbstractMap<Object, Object> {
        private final List<Map.Entry<Object, Object>> entries;
        private final ArraySet set = new ArraySet();

        ArrayMap() {
            this.entries = new ArrayList<Map.Entry<Object, Object>>();
        }

        @Override
        @Nonnull
        public Set<Map.Entry<Object, Object>> entrySet() {
            return this.set;
        }

        public void add(Map.Entry entry) {
            this.entries.add(entry);
        }

        @Override
        public String toString() {
            return this.entries.toString();
        }

        private class ArraySet
        extends AbstractSet<Map.Entry<Object, Object>> {
            private ArraySet() {
            }

            @Override
            @Nonnull
            public Iterator<Map.Entry<Object, Object>> iterator() {
                return ArrayMap.this.entries.iterator();
            }

            @Override
            public int size() {
                return ArrayMap.this.entries.size();
            }
        }
    }

    private static class CacheFlush {
        private CacheFlush() {
        }

        static DistributedFunction<HazelcastInstance, DistributedConsumer<ArrayMap>> flushToCache(String name, boolean isLocal) {
            return instance -> {
                ICache cache = instance.getCacheManager().getCache(name);
                return buffer -> {
                    try {
                        cache.putAll((Map)buffer);
                    }
                    catch (HazelcastInstanceNotActiveException e) {
                        HazelcastWriters.handleInstanceNotActive(instance, e, isLocal);
                    }
                    buffer.clear();
                };
            };
        }
    }
}

