/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.client.impl.proxy.ClientMapProxy;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.impl.connector.AbstractHazelcastConnectorSupplier;
import com.hazelcast.jet.impl.connector.AsyncHazelcastWriterP;
import com.hazelcast.jet.impl.connector.HazelcastWriters;
import com.hazelcast.map.IMap;
import com.hazelcast.map.impl.proxy.MapProxyImpl;
import com.hazelcast.partition.PartitioningStrategy;
import java.util.AbstractMap;
import java.util.Map;
import java.util.function.Consumer;
import javax.annotation.Nonnull;

public final class WriteMapP<K, V>
extends AsyncHazelcastWriterP {
    private static final int BUFFER_LIMIT = 1024;
    private final String mapName;
    private final SerializationService serializationService;
    private final HazelcastWriters.ArrayMap<Data, Data> buffer = new HazelcastWriters.ArrayMap(1024);
    private IMap<Data, Data> map;
    private Consumer<Map.Entry<K, V>> addToBuffer;

    private WriteMapP(@Nonnull HazelcastInstance instance, int maxParallelAsyncOps, String mapName, @Nonnull SerializationService serializationService) {
        super(instance, maxParallelAsyncOps);
        this.mapName = mapName;
        this.serializationService = serializationService;
    }

    @Override
    public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
        this.map = this.instance().getMap(this.mapName);
        if (this.map instanceof MapProxyImpl) {
            PartitioningStrategy partitionStrategy = ((MapProxyImpl)this.map).getPartitionStrategy();
            this.addToBuffer = entry -> {
                Object key = this.serializationService.toData(entry.getKey(), partitionStrategy);
                Object value = this.serializationService.toData(entry.getValue());
                this.buffer.add(new AbstractMap.SimpleEntry(key, value));
            };
        } else if (this.map instanceof ClientMapProxy) {
            this.addToBuffer = entry -> {
                Object key = this.serializationService.toData(entry.getKey());
                Object value = this.serializationService.toData(entry.getValue());
                this.buffer.add(new AbstractMap.SimpleEntry(key, value));
            };
        } else {
            throw new RuntimeException("Unexpected map class: " + this.map.getClass().getName());
        }
    }

    @Override
    protected void processInternal(Inbox inbox) {
        if (this.buffer.size() < 1024) {
            inbox.drain(this.addToBuffer);
        }
        this.submitPending();
    }

    @Override
    protected boolean flushInternal() {
        return this.submitPending();
    }

    private boolean submitPending() {
        if (this.buffer.isEmpty()) {
            return true;
        }
        if (!this.tryAcquirePermit()) {
            return false;
        }
        this.setCallback(this.map.putAllAsync(this.buffer));
        this.buffer.clear();
        return true;
    }

    public static class Supplier
    extends AbstractHazelcastConnectorSupplier {
        private static final long serialVersionUID = 1L;
        private static final int MAX_PARALLELISM = 16;
        private final String mapName;
        private int maxParallelAsyncOps;

        public Supplier(String clientXml, String mapName) {
            super(clientXml);
            this.mapName = mapName;
        }

        @Override
        public void init(@Nonnull ProcessorSupplier.Context context) {
            super.init(context);
            this.maxParallelAsyncOps = Integer.max(1, 16 / context.localParallelism());
        }

        @Override
        protected Processor createProcessor(HazelcastInstance instance, SerializationService serializationService) {
            return new WriteMapP(instance, this.maxParallelAsyncOps, this.mapName, serializationService);
        }
    }
}

