/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IList;
import com.hazelcast.jet.AbstractProcessor;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.ProcessorMetaSupplier;
import com.hazelcast.jet.ProcessorSupplier;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.impl.connector.SerializableClientConfig;
import com.hazelcast.jet.processor.Processors;
import com.hazelcast.nio.Address;
import com.hazelcast.partition.strategy.StringPartitioningStrategy;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;

public final class ReadIListP
extends AbstractProcessor {
    private static final int DEFAULT_FETCH_SIZE = 16384;
    private final Traverser<Object> traverser;

    ReadIListP(List<Object> list, int fetchSize) {
        int size = list.size();
        this.traverser = size <= fetchSize ? Traversers.traverseIterable(list) : Traversers.traverseStream(IntStream.rangeClosed(0, size / fetchSize).mapToObj(chunkIndex -> chunkIndex * fetchSize)).flatMap(start -> Traversers.traverseIterable(list.subList((int)start, Math.min(start + fetchSize, size))));
    }

    @Override
    public boolean complete() {
        return this.emitFromTraverser(this.traverser);
    }

    @Override
    public boolean isCooperative() {
        return false;
    }

    public static ProcessorMetaSupplier supplier(String listName) {
        return new MetaSupplier(listName, 16384);
    }

    public static ProcessorMetaSupplier supplier(String listName, int fetchSize) {
        return new MetaSupplier(listName, fetchSize);
    }

    public static ProcessorMetaSupplier supplier(String listName, ClientConfig clientConfig) {
        return new MetaSupplier(listName, clientConfig, 16384);
    }

    public static ProcessorMetaSupplier supplier(String listName, ClientConfig clientConfig, int fetchSize) {
        return new MetaSupplier(listName, clientConfig, fetchSize);
    }

    private static void assertCountIsOne(int count) {
        if (count != 1) {
            throw new IllegalArgumentException("Supplier of IListReader asked to create more than one processor instance: " + count);
        }
    }

    private static class Supplier
    implements ProcessorSupplier {
        static final long serialVersionUID = 1L;
        private final String name;
        private final SerializableClientConfig clientConfig;
        private final int fetchSize;
        private transient IList list;
        private transient HazelcastInstance client;

        Supplier(String name, SerializableClientConfig clientConfig, int fetchSize) {
            this.name = name;
            this.clientConfig = clientConfig;
            this.fetchSize = fetchSize;
        }

        @Override
        public void init(@Nonnull ProcessorSupplier.Context context) {
            HazelcastInstance instance = this.isRemote() ? (this.client = HazelcastClient.newHazelcastClient(this.clientConfig.asClientConfig())) : context.jetInstance().getHazelcastInstance();
            this.list = instance.getList(this.name);
        }

        private boolean isRemote() {
            return this.clientConfig != null;
        }

        @Override
        public void complete(Throwable error) {
            if (this.client != null) {
                this.client.shutdown();
            }
        }

        @Nonnull
        public List<Processor> get(int count) {
            ReadIListP.assertCountIsOne(count);
            return Collections.singletonList(new ReadIListP(this.list, this.fetchSize));
        }
    }

    private static class MetaSupplier
    implements ProcessorMetaSupplier {
        static final long serialVersionUID = 1L;
        private final String name;
        private final SerializableClientConfig clientConfig;
        private final int fetchSize;
        private transient Address ownerAddress;

        MetaSupplier(String name, int fetchSize) {
            this(name, null, fetchSize);
        }

        MetaSupplier(String name, ClientConfig clientConfig, int fetchSize) {
            this.name = name;
            this.clientConfig = clientConfig != null ? new SerializableClientConfig(clientConfig) : null;
            this.fetchSize = fetchSize;
        }

        @Override
        public void init(@Nonnull ProcessorMetaSupplier.Context context) {
            String partitionKey = StringPartitioningStrategy.getPartitionKey(this.name);
            this.ownerAddress = context.jetInstance().getHazelcastInstance().getPartitionService().getPartition(partitionKey).getOwner().getAddress();
        }

        @Nonnull
        public DistributedFunction<Address, ProcessorSupplier> get(@Nonnull List<Address> addresses) {
            return address -> {
                if (address.equals(this.ownerAddress)) {
                    return new Supplier(this.name, this.clientConfig, this.fetchSize);
                }
                return c -> {
                    ReadIListP.assertCountIsOne(c);
                    return Collections.singletonList(Processors.noop().get());
                };
            };
        }
    }
}

