/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.cache.EventJournalCacheEvent;
import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
import com.hazelcast.cluster.Address;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.PredicateEx;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.internal.journal.EventJournalInitialSubscriberState;
import com.hazelcast.internal.journal.EventJournalReader;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.EventTimeMapper;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.ImdgUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.pipeline.JournalInitialPosition;
import com.hazelcast.map.EventJournalMapEvent;
import com.hazelcast.nio.serialization.HazelcastSerializationException;
import com.hazelcast.partition.Partition;
import com.hazelcast.ringbuffer.ReadResultSet;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public final class StreamEventJournalP<E, T>
extends AbstractProcessor {
    private static final int MAX_FETCH_SIZE = 128;
    @Nonnull
    private final EventJournalReader<? extends E> eventJournalReader;
    @Nonnull
    private final Predicate<? super E> predicate;
    @Nonnull
    private final Function<? super E, ? extends T> projection;
    @Nonnull
    private final JournalInitialPosition initialPos;
    @Nonnull
    private final int[] partitionIds;
    @Nonnull
    private final EventTimeMapper<? super T> eventTimeMapper;
    private final boolean isRemoteReader;
    @Nonnull
    private final long[] emitOffsets;
    @Nonnull
    private final long[] readOffsets;
    private CompletableFuture<? extends ReadResultSet<? extends T>>[] readFutures;
    @Nullable
    private ReadResultSet<? extends T> resultSet;
    private int currentPartitionIndex = -1;
    private int resultSetPosition;
    private Traverser<Map.Entry<BroadcastKey<Integer>, long[]>> snapshotTraverser;
    private Traverser<Object> traverser = Traversers.empty();

    StreamEventJournalP(@Nonnull EventJournalReader<? extends E> eventJournalReader, @Nonnull List<Integer> assignedPartitions, @Nonnull PredicateEx<? super E> predicateFn, @Nonnull FunctionEx<? super E, ? extends T> projectionFn, @Nonnull JournalInitialPosition initialPos, boolean isRemoteReader, @Nonnull EventTimePolicy<? super T> eventTimePolicy) {
        this.eventJournalReader = eventJournalReader;
        this.predicate = ImdgUtil.maybeUnwrapImdgPredicate(predicateFn);
        this.projection = ImdgUtil.maybeUnwrapImdgFunction(projectionFn);
        this.initialPos = initialPos;
        this.isRemoteReader = isRemoteReader;
        this.partitionIds = assignedPartitions.stream().mapToInt(Integer::intValue).toArray();
        this.emitOffsets = new long[this.partitionIds.length];
        this.readOffsets = new long[this.partitionIds.length];
        this.eventTimeMapper = new EventTimeMapper<T>(eventTimePolicy);
        assert (this.partitionIds.length > 0) : "no partitions assigned";
        this.eventTimeMapper.addPartitions(1);
    }

    @Override
    protected void init(@Nonnull Processor.Context context) throws Exception {
        CompletableFuture[] futures = new CompletableFuture[this.partitionIds.length];
        Arrays.setAll(futures, i -> this.eventJournalReader.subscribeToEventJournal(this.partitionIds[i]));
        for (int i2 = 0; i2 < futures.length; ++i2) {
            this.emitOffsets[i2] = this.readOffsets[i2] = this.getSequence((EventJournalInitialSubscriberState)futures[i2].get());
        }
        if (!this.isRemoteReader) {
            HazelcastInstanceImpl hzInstance = (HazelcastInstanceImpl)context.jetInstance().getHazelcastInstance();
            InternalSerializationService ss = hzInstance.getSerializationService();
            try {
                CustomClassLoadedObject.deserializeWithCustomClassLoader(ss, hzInstance.getClass().getClassLoader(), ss.toData(this.predicate));
                CustomClassLoadedObject.deserializeWithCustomClassLoader(ss, hzInstance.getClass().getClassLoader(), ss.toData(this.projection));
            }
            catch (HazelcastSerializationException e) {
                throw new JetException("The projection or predicate classes are not known to IMDG. It's not enough to add them to the job class path, they must be deployed using User code deployment: " + e, e);
            }
        }
    }

    @Override
    public boolean complete() {
        if (this.readFutures == null) {
            this.initialRead();
        }
        if (!this.emitFromTraverser(this.traverser)) {
            return false;
        }
        do {
            this.tryGetNextResultSet();
            if (this.resultSet == null) break;
            this.emitResultSet();
        } while (this.resultSet == null);
        return false;
    }

    private void emitResultSet() {
        assert (this.resultSet != null) : "null resultSet";
        while (this.resultSetPosition < this.resultSet.size()) {
            T event = this.resultSet.get(this.resultSetPosition);
            this.emitOffsets[this.currentPartitionIndex] = this.resultSet.getSequence(this.resultSetPosition) + 1L;
            ++this.resultSetPosition;
            if (event == null) continue;
            this.traverser = this.eventTimeMapper.flatMapEvent(event, 0, Long.MIN_VALUE);
            if (this.emitFromTraverser(this.traverser)) continue;
            return;
        }
        this.resultSetPosition = 0;
        this.resultSet = null;
    }

    @Override
    public boolean saveToSnapshot() {
        boolean done;
        if (!this.emitFromTraverser(this.traverser)) {
            return false;
        }
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseStream(IntStream.range(0, this.partitionIds.length).mapToObj(pIdx -> Util.entry(BroadcastKey.broadcastKey(this.partitionIds[pIdx]), new long[]{this.emitOffsets[pIdx], this.eventTimeMapper.getWatermark(0)})));
        }
        if (done = this.emitFromTraverserToSnapshot(this.snapshotTraverser)) {
            LoggingUtil.logFinest(this.getLogger(), "Saved snapshot. partitions=%s, offsets=%s, watermark=%d", Arrays.toString(this.partitionIds), Arrays.toString(this.emitOffsets), this.eventTimeMapper.getWatermark(0));
            this.snapshotTraverser = null;
        }
        return done;
    }

    @Override
    protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
        int partitionId = (Integer)((BroadcastKey)key).key();
        int partitionIndex = com.hazelcast.jet.impl.util.Util.arrayIndexOf(partitionId, this.partitionIds);
        long offset = ((long[])value)[0];
        long wm = ((long[])value)[1];
        if (partitionIndex >= 0) {
            this.readOffsets[partitionIndex] = offset;
            this.emitOffsets[partitionIndex] = offset;
            this.eventTimeMapper.restoreWatermark(0, wm);
        }
    }

    @Override
    public boolean finishSnapshotRestore() {
        LoggingUtil.logFinest(this.getLogger(), "Restored snapshot. partitions=%s, offsets=%s", Arrays.toString(this.partitionIds), Arrays.toString(this.readOffsets));
        return true;
    }

    private void initialRead() {
        this.readFutures = new CompletableFuture[this.partitionIds.length];
        for (int i = 0; i < this.readFutures.length; ++i) {
            this.readFutures[i] = this.readFromJournal(this.partitionIds[i], this.readOffsets[i]);
        }
    }

    private long getSequence(EventJournalInitialSubscriberState state) {
        return this.initialPos == JournalInitialPosition.START_FROM_CURRENT ? state.getNewestSequence() + 1L : state.getOldestSequence();
    }

    private void tryGetNextResultSet() {
        while (this.resultSet == null && ++this.currentPartitionIndex < this.partitionIds.length) {
            CompletableFuture<ReadResultSet<ReadResultSet<T>>> future = this.readFutures[this.currentPartitionIndex];
            if (!future.isDone()) continue;
            this.resultSet = this.toResultSet(future);
            int partitionId = this.partitionIds[this.currentPartitionIndex];
            if (this.resultSet != null) {
                assert (this.resultSet.size() > 0) : "empty resultSet";
                long prevSequence = this.readOffsets[this.currentPartitionIndex];
                long lostCount = this.resultSet.getNextSequenceToReadFrom() - (long)this.resultSet.readCount() - prevSequence;
                if (lostCount > 0L) {
                    this.getLogger().warning(lostCount + " events lost for partition " + partitionId + " due to journal overflow when reading from event journal. Increase journal size to avoid this error. nextSequenceToReadFrom=" + this.resultSet.getNextSequenceToReadFrom() + ", readCount=" + this.resultSet.readCount() + ", prevSeq=" + prevSequence);
                }
                this.readOffsets[this.currentPartitionIndex] = this.resultSet.getNextSequenceToReadFrom();
            }
            this.readFutures[this.currentPartitionIndex] = this.readFromJournal(partitionId, this.readOffsets[this.currentPartitionIndex]);
        }
        if (this.currentPartitionIndex == this.partitionIds.length) {
            this.currentPartitionIndex = -1;
            this.traverser = this.eventTimeMapper.flatMapIdle();
        }
    }

    private ReadResultSet<? extends T> toResultSet(CompletableFuture<? extends ReadResultSet<? extends T>> future) {
        try {
            return future.get();
        }
        catch (ExecutionException e) {
            Throwable ex = ExceptionUtil.peel(e);
            if (ex instanceof HazelcastInstanceNotActiveException && !this.isRemoteReader) {
                return null;
            }
            if (ex instanceof HazelcastSerializationException) {
                throw new JetException("Serialization error when reading the journal: are the key, value, predicate and projection classes visible to IMDG? You need to use User Code Deployment, adding the classes to JetConfig isn't enough", e);
            }
            throw ExceptionUtil.rethrow(ex);
        }
        catch (InterruptedException e) {
            throw ExceptionUtil.rethrow(e);
        }
    }

    private CompletableFuture<? extends ReadResultSet<? extends T>> readFromJournal(int partition, long offset) {
        return this.eventJournalReader.readFromEventJournal(offset, 1, 128, partition, this.predicate, this.projection).toCompletableFuture();
    }

    public static <K, V, T> ProcessorMetaSupplier streamMapSupplier(@Nonnull String mapName, @Nonnull PredicateEx<? super EventJournalMapEvent<K, V>> predicate, @Nonnull FunctionEx<? super EventJournalMapEvent<K, V>, ? extends T> projection, @Nonnull JournalInitialPosition initialPos, @Nonnull EventTimePolicy<? super T> eventTimePolicy) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(predicate, "predicate");
        com.hazelcast.jet.impl.util.Util.checkSerializable(projection, "projection");
        return new ClusterMetaSupplier<EventJournalMapEvent<K, V>, T>(null, instance -> (EventJournalReader)((Object)instance.getMap(mapName)), predicate, projection, initialPos, eventTimePolicy);
    }

    public static <K, V, T> ProcessorMetaSupplier streamRemoteMapSupplier(@Nonnull String mapName, @Nonnull String clientXml, @Nonnull PredicateEx<? super EventJournalMapEvent<K, V>> predicate, @Nonnull FunctionEx<? super EventJournalMapEvent<K, V>, ? extends T> projection, @Nonnull JournalInitialPosition initialPos, @Nonnull EventTimePolicy<? super T> eventTimePolicy) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(predicate, "predicate");
        com.hazelcast.jet.impl.util.Util.checkSerializable(projection, "projection");
        return new ClusterMetaSupplier<EventJournalMapEvent<K, V>, T>(clientXml, instance -> (EventJournalReader)((Object)instance.getMap(mapName)), predicate, projection, initialPos, eventTimePolicy);
    }

    public static <K, V, T> ProcessorMetaSupplier streamCacheSupplier(@Nonnull String cacheName, @Nonnull PredicateEx<? super EventJournalCacheEvent<K, V>> predicate, @Nonnull FunctionEx<? super EventJournalCacheEvent<K, V>, ? extends T> projection, @Nonnull JournalInitialPosition initialPos, @Nonnull EventTimePolicy<? super T> eventTimePolicy) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(predicate, "predicate");
        com.hazelcast.jet.impl.util.Util.checkSerializable(projection, "projection");
        return new ClusterMetaSupplier<EventJournalCacheEvent<K, V>, T>(null, inst -> (EventJournalReader)((Object)inst.getCacheManager().getCache(cacheName)), predicate, projection, initialPos, eventTimePolicy);
    }

    public static <K, V, T> ProcessorMetaSupplier streamRemoteCacheSupplier(@Nonnull String cacheName, @Nonnull String clientXml, @Nonnull PredicateEx<? super EventJournalCacheEvent<K, V>> predicate, @Nonnull FunctionEx<? super EventJournalCacheEvent<K, V>, ? extends T> projection, @Nonnull JournalInitialPosition initialPos, @Nonnull EventTimePolicy<? super T> eventTimePolicy) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(predicate, "predicate");
        com.hazelcast.jet.impl.util.Util.checkSerializable(projection, "projection");
        return new ClusterMetaSupplier<EventJournalCacheEvent<K, V>, T>(clientXml, inst -> (EventJournalReader)((Object)inst.getCacheManager().getCache(cacheName)), predicate, projection, initialPos, eventTimePolicy);
    }

    private static class ClusterProcessorSupplier<E, T>
    implements ProcessorSupplier {
        static final long serialVersionUID = 1L;
        @Nonnull
        private final List<Integer> ownedPartitions;
        @Nullable
        private final String clientXml;
        @Nonnull
        private final FunctionEx<? super HazelcastInstance, ? extends EventJournalReader<E>> eventJournalReaderSupplier;
        @Nonnull
        private final PredicateEx<? super E> predicate;
        @Nonnull
        private final FunctionEx<? super E, ? extends T> projection;
        @Nonnull
        private final JournalInitialPosition initialPos;
        @Nonnull
        private final EventTimePolicy<? super T> eventTimePolicy;
        private transient HazelcastInstance client;
        private transient EventJournalReader<E> eventJournalReader;

        ClusterProcessorSupplier(@Nonnull List<Integer> ownedPartitions, @Nullable String clientXml, @Nonnull FunctionEx<? super HazelcastInstance, ? extends EventJournalReader<E>> eventJournalReaderSupplier, @Nonnull PredicateEx<? super E> predicate, @Nonnull FunctionEx<? super E, ? extends T> projection, @Nonnull JournalInitialPosition initialPos, @Nonnull EventTimePolicy<? super T> eventTimePolicy) {
            this.ownedPartitions = ownedPartitions;
            this.clientXml = clientXml;
            this.eventJournalReaderSupplier = eventJournalReaderSupplier;
            this.predicate = predicate;
            this.projection = projection;
            this.initialPos = initialPos;
            this.eventTimePolicy = eventTimePolicy;
        }

        @Override
        public void init(@Nonnull ProcessorSupplier.Context context) {
            HazelcastInstance instance = context.jetInstance().getHazelcastInstance();
            if (this.clientXml != null) {
                instance = this.client = HazelcastClient.newHazelcastClient(ImdgUtil.asClientConfig(this.clientXml));
            }
            this.eventJournalReader = this.eventJournalReaderSupplier.apply(instance);
        }

        @Override
        public void close(Throwable error) {
            if (this.client != null) {
                this.client.shutdown();
            }
        }

        @Nonnull
        public List<Processor> get(int count) {
            return com.hazelcast.jet.impl.util.Util.distributeObjects(count, this.ownedPartitions).values().stream().map(this::processorForPartitions).collect(Collectors.toList());
        }

        private Processor processorForPartitions(List<Integer> partitions) {
            return partitions.isEmpty() ? Processors.noopP().get() : new StreamEventJournalP<E, T>(this.eventJournalReader, partitions, this.predicate, this.projection, this.initialPos, this.client != null, this.eventTimePolicy);
        }
    }

    private static class ClusterMetaSupplier<E, T>
    implements ProcessorMetaSupplier {
        static final long serialVersionUID = 1L;
        private final String clientXml;
        private final FunctionEx<? super HazelcastInstance, ? extends EventJournalReader<E>> eventJournalReaderSupplier;
        private final PredicateEx<? super E> predicate;
        private final FunctionEx<? super E, ? extends T> projection;
        private final JournalInitialPosition initialPos;
        private final EventTimePolicy<? super T> eventTimePolicy;
        private transient int remotePartitionCount;
        private transient Map<Address, List<Integer>> addrToPartitions;

        ClusterMetaSupplier(@Nullable String clientXml, @Nonnull FunctionEx<? super HazelcastInstance, ? extends EventJournalReader<E>> eventJournalReaderSupplier, @Nonnull PredicateEx<? super E> predicate, @Nonnull FunctionEx<? super E, ? extends T> projection, @Nonnull JournalInitialPosition initialPos, @Nonnull EventTimePolicy<? super T> eventTimePolicy) {
            this.clientXml = clientXml;
            this.eventJournalReaderSupplier = eventJournalReaderSupplier;
            this.predicate = predicate;
            this.projection = projection;
            this.initialPos = initialPos;
            this.eventTimePolicy = eventTimePolicy;
        }

        @Override
        public int preferredLocalParallelism() {
            return this.clientXml != null ? 1 : 2;
        }

        @Override
        public void init(@Nonnull ProcessorMetaSupplier.Context context) {
            if (this.clientXml != null) {
                this.initRemote();
            } else {
                this.initLocal(context.jetInstance().getHazelcastInstance().getPartitionService().getPartitions());
            }
        }

        private void initRemote() {
            HazelcastInstance client = HazelcastClient.newHazelcastClient(ImdgUtil.asClientConfig(this.clientXml));
            try {
                HazelcastClientProxy clientProxy = (HazelcastClientProxy)client;
                this.remotePartitionCount = clientProxy.client.getClientPartitionService().getPartitionCount();
            }
            finally {
                client.shutdown();
            }
        }

        private void initLocal(Set<Partition> partitions) {
            this.addrToPartitions = partitions.stream().collect(Collectors.groupingBy(p -> p.getOwner().getAddress(), Collectors.mapping(Partition::getPartitionId, Collectors.toList())));
        }

        @Nonnull
        public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> addresses) {
            if (this.addrToPartitions == null) {
                this.addrToPartitions = IntStream.range(0, this.remotePartitionCount).boxed().collect(Collectors.groupingBy(partition -> (Address)addresses.get(partition % addresses.size())));
            }
            return address -> new ClusterProcessorSupplier<E, T>(this.addrToPartitions.get(address), this.clientXml, this.eventJournalReaderSupplier, this.predicate, this.projection, this.initialPos, this.eventTimePolicy);
        }
    }
}

