/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stream.impl;

import io.reactivex.Flowable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.PrimitiveIterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.BaseStream;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.infinispan.BaseCacheStream;
import org.infinispan.CacheStream;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.marshall.AbstractExternalizer;
import org.infinispan.commons.util.ByRef;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.RangeSet;
import org.infinispan.commons.util.SmallIntSet;
import org.infinispan.commons.util.Util;
import org.infinispan.commons.util.concurrent.ConcurrentHashSet;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.partitionhandling.impl.PartitionHandlingManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.stream.StreamMarshalling;
import org.infinispan.stream.impl.ClusterStreamManager;
import org.infinispan.stream.impl.KeyTrackingTerminalOperation;
import org.infinispan.stream.impl.intops.FlatMappingOperation;
import org.infinispan.stream.impl.intops.IntermediateOperation;
import org.infinispan.stream.impl.termop.BaseTerminalOperation;
import org.infinispan.stream.impl.termop.SegmentRetryingOperation;
import org.infinispan.stream.impl.termop.SingleRunOperation;
import org.infinispan.util.KeyValuePair;
import org.infinispan.util.logging.Log;
import org.jboss.marshalling.util.IdentityIntMap;
import org.reactivestreams.Publisher;

public abstract class AbstractCacheStream<Original, T, S extends BaseStream<T, S>, S2 extends S>
implements BaseStream<T, S> {
    protected final Queue<IntermediateOperation> intermediateOperations;
    protected final Address localAddress;
    protected final DistributionManager dm;
    protected final Supplier<CacheStream<Original>> supplier;
    protected final ClusterStreamManager csm;
    protected final Executor executor;
    protected final ComponentRegistry registry;
    protected final PartitionHandlingManager partition;
    protected final KeyPartitioner keyPartitioner;
    protected final StateTransferLock stateTransferLock;
    protected final boolean includeLoader;
    protected final Function<? super Original, ?> toKeyFunction;
    protected Runnable closeRunnable = null;
    protected Boolean parallelDistribution;
    protected boolean parallel;
    protected boolean rehashAware = true;
    protected Set<?> keysToFilter;
    protected IntSet segmentsToFilter;
    protected int distributedBatchSize;
    protected Consumer<Supplier<PrimitiveIterator.OfInt>> segmentCompletionListener;
    protected IteratorOperation iteratorOperation = IteratorOperation.NO_MAP;
    protected long timeout = 30L;
    protected TimeUnit timeoutUnit = TimeUnit.SECONDS;

    protected AbstractCacheStream(Address localAddress, boolean parallel, DistributionManager dm, Supplier<CacheStream<Original>> supplier, ClusterStreamManager<Original, Object> csm, boolean includeLoader, int distributedBatchSize, Executor executor, ComponentRegistry registry, Function<? super Original, ?> toKeyFunction) {
        this.localAddress = localAddress;
        this.parallel = parallel;
        this.dm = dm;
        this.supplier = supplier;
        this.csm = csm;
        this.includeLoader = includeLoader;
        this.distributedBatchSize = distributedBatchSize;
        this.executor = executor;
        this.registry = registry;
        this.toKeyFunction = toKeyFunction;
        this.partition = registry.getComponent(PartitionHandlingManager.class);
        this.keyPartitioner = registry.getComponent(KeyPartitioner.class);
        this.stateTransferLock = registry.getComponent(StateTransferLock.class);
        this.intermediateOperations = new ArrayDeque<IntermediateOperation>();
    }

    protected AbstractCacheStream(AbstractCacheStream<Original, T, S, S2> other) {
        this.intermediateOperations = other.intermediateOperations;
        this.localAddress = other.localAddress;
        this.dm = other.dm;
        this.supplier = other.supplier;
        this.csm = other.csm;
        this.includeLoader = other.includeLoader;
        this.executor = other.executor;
        this.registry = other.registry;
        this.toKeyFunction = other.toKeyFunction;
        this.partition = other.partition;
        this.keyPartitioner = other.keyPartitioner;
        this.stateTransferLock = other.stateTransferLock;
        this.closeRunnable = other.closeRunnable;
        this.parallel = other.parallel;
        this.parallelDistribution = other.parallelDistribution;
        this.rehashAware = other.rehashAware;
        this.keysToFilter = other.keysToFilter;
        this.segmentsToFilter = other.segmentsToFilter;
        this.distributedBatchSize = other.distributedBatchSize;
        this.segmentCompletionListener = other.segmentCompletionListener;
        this.iteratorOperation = other.iteratorOperation;
        this.timeout = other.timeout;
        this.timeoutUnit = other.timeoutUnit;
    }

    protected abstract Log getLog();

    protected S2 addIntermediateOperation(IntermediateOperation<T, S, T, S> intermediateOperation) {
        intermediateOperation.handleInjection(this.registry);
        this.addIntermediateOperation(this.intermediateOperations, intermediateOperation);
        return this.unwrap();
    }

    protected void addIntermediateOperationMap(IntermediateOperation<T, S, ?, ?> intermediateOperation) {
        intermediateOperation.handleInjection(this.registry);
        this.addIntermediateOperation(this.intermediateOperations, intermediateOperation);
    }

    protected void addIntermediateOperation(Queue<IntermediateOperation> intermediateOperations, IntermediateOperation<T, S, ?, ?> intermediateOperation) {
        intermediateOperations.add(intermediateOperation);
    }

    protected abstract S2 unwrap();

    protected Function<? super Original, ?> nonNullKeyFunction() {
        if (this.toKeyFunction == null) {
            return StreamMarshalling.identity();
        }
        return this.toKeyFunction;
    }

    @Override
    public boolean isParallel() {
        return this.parallel;
    }

    boolean getParallelDistribution() {
        return this.parallelDistribution == null ? true : this.parallelDistribution;
    }

    @Override
    public S2 sequential() {
        this.parallel = false;
        return this.unwrap();
    }

    @Override
    public S2 parallel() {
        this.parallel = true;
        return this.unwrap();
    }

    @Override
    public S2 unordered() {
        return this.unwrap();
    }

    @Override
    public S2 onClose(Runnable closeHandler) {
        this.closeRunnable = this.closeRunnable == null ? closeHandler : Util.composeWithExceptions((Runnable)this.closeRunnable, (Runnable)closeHandler);
        return this.unwrap();
    }

    @Override
    public void close() {
        if (this.closeRunnable != null) {
            this.closeRunnable.run();
        }
    }

    <R> R performOperation(Function<? super S2, ? extends R> function, boolean retryOnRehash, BinaryOperator<R> accumulator, Predicate<? super R> earlyTerminatePredicate) {
        ResultsAccumulator<R> remoteResults = new ResultsAccumulator<R>(accumulator);
        if (this.rehashAware) {
            return this.performOperationRehashAware(function, retryOnRehash, remoteResults, earlyTerminatePredicate);
        }
        return this.performOperation(function, remoteResults, earlyTerminatePredicate);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    <R> R performOperation(Function<? super S2, ? extends R> function, ResultsAccumulator<R> remoteResults, Predicate<? super R> earlyTerminatePredicate) {
        ConsistentHash ch = this.dm.getWriteConsistentHash();
        SingleRunOperation op = new SingleRunOperation(this.intermediateOperations, this.supplierForSegments(ch, this.segmentsToFilter, null), function);
        Object id = this.csm.remoteStreamOperation(this.getParallelDistribution(), this.parallel, ch, (Set<Integer>)this.segmentsToFilter, this.keysToFilter, Collections.emptyMap(), this.includeLoader, this.toKeyFunction != null, op, remoteResults, earlyTerminatePredicate);
        try {
            Object localValue = op.performOperation();
            remoteResults.onCompletion(null, Collections.emptySet(), localValue);
            if (id != null) {
                try {
                    if (!(earlyTerminatePredicate != null && earlyTerminatePredicate.test(localValue) || this.csm.awaitCompletion(id, this.timeout, this.timeoutUnit))) {
                        throw new org.infinispan.util.concurrent.TimeoutException();
                    }
                }
                catch (InterruptedException e) {
                    throw new CacheException((Throwable)e);
                }
            }
            this.getLog().tracef("Finished operation for id %s", id);
            Object r = remoteResults.currentValue;
            return r;
        }
        finally {
            this.csm.forgetOperation(id);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    <R> R performOperationRehashAware(Function<? super S2, ? extends R> function, boolean retryOnRehash, ResultsAccumulator<R> remoteResults, Predicate<? super R> earlyTerminatePredicate) {
        IntSet segmentsToProcess = this.segmentsToFilter;
        do {
            Object id;
            LocalizedCacheTopology cacheTopology;
            block19: {
                cacheTopology = this.dm.getCacheTopology();
                ConsistentHash ch = cacheTopology.getReadConsistentHash();
                BaseTerminalOperation op = retryOnRehash ? new SegmentRetryingOperation(this.intermediateOperations, this.supplierForSegments(ch, segmentsToProcess, null), function) : new SingleRunOperation(this.intermediateOperations, this.supplierForSegments(ch, segmentsToProcess, null), function);
                id = this.csm.remoteStreamOperationRehashAware(this.getParallelDistribution(), this.parallel, ch, (Set<Integer>)segmentsToProcess, this.keysToFilter, Collections.emptyMap(), this.includeLoader, this.toKeyFunction != null, op, remoteResults, earlyTerminatePredicate);
                try {
                    Object localValue;
                    boolean localRun = ch.getMembers().contains(this.localAddress);
                    if (localRun) {
                        Set<Integer> ourSegments;
                        localValue = op.performOperation();
                        if (this.dm.getReadConsistentHash().equals(ch)) {
                            ourSegments = ch.getPrimarySegmentsForOwner(this.localAddress);
                            if (segmentsToProcess != null) {
                                ourSegments.retainAll((Collection<?>)segmentsToProcess);
                            }
                            remoteResults.onCompletion(null, ourSegments, localValue);
                        } else if (segmentsToProcess != null) {
                            ourSegments = ch.getPrimarySegmentsForOwner(this.localAddress);
                            ourSegments.retainAll((Collection<?>)segmentsToProcess);
                            remoteResults.onSegmentsLost(ourSegments);
                        } else {
                            remoteResults.onSegmentsLost(ch.getPrimarySegmentsForOwner(this.localAddress));
                        }
                    } else {
                        localValue = null;
                    }
                    if (id == null) break block19;
                    try {
                        if (!(localRun && earlyTerminatePredicate != null && earlyTerminatePredicate.test(localValue) || this.csm.awaitCompletion(id, this.timeout, this.timeoutUnit))) {
                            throw new org.infinispan.util.concurrent.TimeoutException();
                        }
                    }
                    catch (InterruptedException e) {
                        throw new CacheException((Throwable)e);
                    }
                }
                finally {
                    this.csm.forgetOperation(id);
                }
            }
            if (!((ResultsAccumulator)remoteResults).lostSegments.isEmpty()) {
                segmentsToProcess = SmallIntSet.from((Set)((ResultsAccumulator)remoteResults).lostSegments);
                ((ResultsAccumulator)remoteResults).lostSegments.clear();
                this.getLog().tracef("Found %s lost segments for identifier %s", segmentsToProcess, id);
                try {
                    int nextTopology = cacheTopology.getTopologyId() + 1;
                    this.getLog().tracef("Waiting for topology %d to continue stream operation with segments %s", nextTopology, segmentsToProcess);
                    this.stateTransferLock.topologyFuture(nextTopology).get(this.timeout, this.timeoutUnit);
                }
                catch (InterruptedException | ExecutionException | TimeoutException e) {
                    throw new CacheException((Throwable)e);
                }
            } else {
                if (segmentsToProcess != null) {
                    segmentsToProcess = null;
                }
                this.getLog().tracef("Finished rehash aware operation for id %s", id);
            }
        } while (segmentsToProcess != null && !segmentsToProcess.isEmpty());
        return remoteResults.currentValue;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void performRehashKeyTrackingOperation(Function<Supplier<Stream<Original>>, KeyTrackingTerminalOperation<Original, Object, ? extends T>> function) {
        RangeSet segmentsToProcess;
        AtomicBoolean complete = new AtomicBoolean();
        ConsistentHash segmentInfoCH = this.dm.getReadConsistentHash();
        KeyTrackingConsumer<Object> results = new KeyTrackingConsumer<Object>(this.keyPartitioner, segmentInfoCH.getNumSegments());
        Object object = segmentsToProcess = this.segmentsToFilter == null ? new RangeSet(segmentInfoCH.getNumSegments()) : this.segmentsToFilter;
        do {
            Object id;
            LocalizedCacheTopology cacheTopology;
            block16: {
                Set<Object> excludedKeys;
                Set<Integer> segments;
                ConsistentHash ch;
                boolean localRun;
                if (localRun = (ch = (cacheTopology = this.dm.getCacheTopology()).getReadConsistentHash()).getMembers().contains(this.localAddress)) {
                    segments = ch.getPrimarySegmentsForOwner(this.localAddress);
                    segments.retainAll((Collection<?>)segmentsToProcess);
                    excludedKeys = segments.stream().flatMap(s -> results.referenceArray.get((int)s).stream()).collect(Collectors.toSet());
                } else {
                    segments = null;
                    excludedKeys = Collections.emptySet();
                }
                KeyTrackingTerminalOperation<Original, Object, T> op = function.apply(this.supplierForSegments(ch, (IntSet)segmentsToProcess, excludedKeys));
                op.handleInjection(this.registry);
                id = this.csm.remoteStreamOperationRehashAware(this.getParallelDistribution(), this.parallel, ch, (Set<Integer>)segmentsToProcess, this.keysToFilter, new AtomicReferenceArrayToMap(results.referenceArray), this.includeLoader, this.toKeyFunction != null, op, results);
                try {
                    if (localRun) {
                        Collection<Object> localValue = op.performForEachOperation(results);
                        if (this.dm.getReadConsistentHash().equals(ch)) {
                            this.getLog().tracef("Found local values %s for id %s", localValue.size(), id);
                            results.onCompletion((Address)null, segments, localValue);
                        } else {
                            Set<Integer> ourSegments = ch.getPrimarySegmentsForOwner(this.localAddress);
                            ourSegments.retainAll((Collection<?>)segmentsToProcess);
                            this.getLog().tracef("CH changed - making %s segments suspect for identifier %s", ourSegments, id);
                            results.onSegmentsLost(ourSegments);
                            results.onIntermediateResult((Address)null, localValue);
                        }
                    }
                    if (id == null) break block16;
                    try {
                        if (!this.csm.awaitCompletion(id, this.timeout, this.timeoutUnit)) {
                            throw new org.infinispan.util.concurrent.TimeoutException();
                        }
                    }
                    catch (InterruptedException e) {
                        throw new CacheException((Throwable)e);
                    }
                }
                finally {
                    this.csm.forgetOperation(id);
                }
            }
            if (!results.lostSegments.isEmpty()) {
                segmentsToProcess = SmallIntSet.from(results.lostSegments);
                results.lostSegments.clear();
                this.getLog().tracef("Found %s lost segments for identifier %s", segmentsToProcess, id);
                try {
                    int nextTopology = cacheTopology.getTopologyId() + 1;
                    this.getLog().tracef("Waiting for topology %d to continue key tracking operation with segments %s", nextTopology, segmentsToProcess);
                    this.stateTransferLock.topologyFuture(nextTopology).get(this.timeout, this.timeoutUnit);
                }
                catch (InterruptedException | ExecutionException | TimeoutException e) {
                    throw new CacheException((Throwable)e);
                }
            } else {
                this.getLog().tracef("Finished rehash aware operation for id %s", id);
                complete.set(true);
            }
        } while (!complete.get());
    }

    protected boolean isPrimaryOwner(ConsistentHash ch, Object key) {
        return this.localAddress.equals(ch.locatePrimaryOwnerForSegment(this.keyPartitioner.getSegment(key)));
    }

    protected Supplier<Stream<Original>> supplierForSegments(ConsistentHash ch, IntSet targetSegments, Set<Object> excludedKeys) {
        return this.supplierForSegments(ch, targetSegments, excludedKeys, true);
    }

    protected Supplier<Stream<Original>> supplierForSegments(ConsistentHash ch, IntSet targetSegments, Set<Object> excludedKeys, boolean usePrimary) {
        IntSet segments;
        if (!ch.getMembers().contains(this.localAddress)) {
            return Stream::empty;
        }
        if (usePrimary) {
            segments = SmallIntSet.from(ch.getPrimarySegmentsForOwner(this.localAddress));
            if (targetSegments != null) {
                segments.retainAll(targetSegments);
            }
        } else {
            segments = targetSegments;
        }
        return () -> {
            if (segments != null && segments.isEmpty()) {
                return Stream.empty();
            }
            BaseCacheStream stream = this.supplier.get().filterKeySegments(segments);
            if (this.keysToFilter != null) {
                stream = stream.filterKeys((Set)this.keysToFilter);
            }
            if (excludedKeys != null && !excludedKeys.isEmpty()) {
                return stream.filter(e -> !excludedKeys.contains(this.toKeyFunction == null ? e : this.toKeyFunction.apply(e)));
            }
            return this.parallel ? stream.parallel() : stream.sequential();
        };
    }

    protected static Consumer<Supplier<PrimitiveIterator.OfInt>> composeWithExceptions(Consumer<Supplier<PrimitiveIterator.OfInt>> a, Consumer<Supplier<PrimitiveIterator.OfInt>> b) {
        return segments -> {
            try {
                a.accept((Supplier<PrimitiveIterator.OfInt>)segments);
            }
            catch (Throwable e1) {
                try {
                    b.accept((Supplier<PrimitiveIterator.OfInt>)segments);
                }
                catch (Throwable e2) {
                    try {
                        e1.addSuppressed(e2);
                    }
                    catch (Throwable throwable) {
                        // empty catch block
                    }
                }
                throw e1;
            }
            b.accept((Supplier<PrimitiveIterator.OfInt>)segments);
        };
    }

    public static class MapOpsExternalizer
    extends AbstractExternalizer<IntermediateOperation> {
        static final int MAP = 0;
        static final int FLATMAP = 1;
        private final IdentityIntMap<Class<?>> numbers = new IdentityIntMap(2);

        public MapOpsExternalizer() {
            this.numbers.put(MapHandler.class, 0);
            this.numbers.put(FlatMapHandler.class, 1);
        }

        public Integer getId() {
            return 113;
        }

        public Set<Class<? extends IntermediateOperation>> getTypeClasses() {
            return Util.asSet((Object[])new Class[]{MapHandler.class, FlatMapHandler.class});
        }

        public void writeObject(ObjectOutput output, IntermediateOperation object) throws IOException {
            int number = this.numbers.get(object.getClass(), -1);
            output.write(number);
            switch (number) {
                case 0: 
                case 1: {
                    output.writeObject(((MapHandler)object).intermediateOperations);
                    output.writeObject(((MapHandler)object).toKeyFunction);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unsupported number " + number + " found for class: " + object.getClass());
                }
            }
        }

        public IntermediateOperation readObject(ObjectInput input) throws IOException, ClassNotFoundException {
            int number = input.readUnsignedByte();
            switch (number) {
                case 0: {
                    return new MapHandler((Iterable)input.readObject(), (Function)input.readObject());
                }
                case 1: {
                    return new FlatMapHandler((Iterable)input.readObject(), (Function)input.readObject());
                }
            }
            throw new IllegalArgumentException("Unsupported number " + number + " found!");
        }
    }

    static class FlatMapHandler<OutputType, OutputStream extends BaseStream<OutputType, OutputStream>>
    extends MapHandler<OutputType, OutputStream> {
        FlatMapHandler(Iterable<IntermediateOperation> intermediateOperations, Function<Object, ?> toKeyFunction) {
            super(intermediateOperations, toKeyFunction);
        }

        @Override
        public OutputStream perform(Stream<Object> cacheEntryStream) {
            ByRef key = new ByRef(null);
            Stream<Object> stream = cacheEntryStream.peek(e -> key.set(this.toKeyFunction.apply(e)));
            Iterator iter = this.intermediateOperations.iterator();
            while (iter.hasNext()) {
                IntermediateOperation intermediateOperation = (IntermediateOperation)iter.next();
                if (intermediateOperation instanceof FlatMappingOperation) {
                    ArrayList remainingOps = new ArrayList();
                    iter.forEachRemaining(remainingOps::add);
                    Stream wrappedStream = ((FlatMappingOperation)intermediateOperation).map(stream);
                    stream = wrappedStream.map(s -> {
                        for (IntermediateOperation innerIntOp : remainingOps) {
                            s = innerIntOp.perform(s);
                        }
                        return new KeyValuePair(key.get(), ((Stream)s).collect(Collectors.toList()));
                    });
                    continue;
                }
                stream = intermediateOperation.perform(stream);
            }
            return (OutputStream)stream;
        }
    }

    static class MapHandler<OutputType, OutputStream extends BaseStream<OutputType, OutputStream>>
    implements IntermediateOperation<Object, Stream<Object>, OutputType, OutputStream> {
        final Iterable<IntermediateOperation> intermediateOperations;
        final Function<Object, ?> toKeyFunction;

        MapHandler(Iterable<IntermediateOperation> intermediateOperations, Function<Object, ?> toKeyFunction) {
            this.intermediateOperations = intermediateOperations;
            this.toKeyFunction = toKeyFunction;
        }

        @Override
        public OutputStream perform(Stream<Object> cacheEntryStream) {
            ByRef key = new ByRef(null);
            Stream<Object> stream = cacheEntryStream.peek(e -> key.set(this.toKeyFunction.apply(e)));
            for (IntermediateOperation intermediateOperation : this.intermediateOperations) {
                stream = intermediateOperation.perform(stream);
            }
            return (OutputStream)stream.map(r -> new KeyValuePair<Object, Object>(key.get(), r));
        }
    }

    static enum IteratorOperation {
        NO_MAP{

            @Override
            public Iterable<IntermediateOperation> prepareForIteration(Iterable<IntermediateOperation> intermediateOperations, Function<Object, ?> toKeyFunction) {
                return intermediateOperations;
            }

            @Override
            public <V> Publisher<V> handlePublisher(Publisher<V> publisher, Consumer<Object> keyConsumer, Function<V, ?> toKeyFunction) {
                return Flowable.fromPublisher(publisher).doOnNext(e -> keyConsumer.accept(toKeyFunction.apply(e)));
            }
        }
        ,
        MAP{

            @Override
            public <In, Out> Function<In, Out> getFunction() {
                return e -> ((KeyValuePair)e).getValue();
            }

            @Override
            public Iterable<IntermediateOperation> prepareForIteration(Iterable<IntermediateOperation> intermediateOperations, Function<Object, ?> toKeyFunction) {
                return Collections.singletonList(new MapHandler(intermediateOperations, toKeyFunction));
            }
        }
        ,
        FLAT_MAP{

            @Override
            public Iterable<IntermediateOperation> prepareForIteration(Iterable<IntermediateOperation> intermediateOperations, Function<Object, ?> toKeyFunction) {
                return Collections.singletonList(new FlatMapHandler(intermediateOperations, toKeyFunction));
            }

            @Override
            public <V> Publisher<V> handlePublisher(Publisher<V> publisher, Consumer<Object> keyConsumer, Function<V, ?> toKeyFunction) {
                return this.flowableFromPublisher(publisher, keyConsumer).flatMap(e -> Flowable.fromIterable((Iterable)((Iterable)((KeyValuePair)e).getValue())));
            }
        };


        public <In, Out> Function<In, Out> getFunction() {
            return null;
        }

        public abstract Iterable<IntermediateOperation> prepareForIteration(Iterable<IntermediateOperation> var1, Function<Object, ?> var2);

        public <V> Publisher<V> handlePublisher(Publisher<V> publisher, Consumer<Object> keyConsumer, Function<V, ?> toKeyFunction) {
            return this.flowableFromPublisher(publisher, keyConsumer);
        }

        protected <V> Flowable<V> flowableFromPublisher(Publisher<V> publisher, Consumer<Object> keyConsumer) {
            return Flowable.fromPublisher(publisher).doOnNext(e -> keyConsumer.accept(((KeyValuePair)e).getKey()));
        }
    }

    static class ResultsAccumulator<R>
    implements ClusterStreamManager.ResultsCallback<R> {
        private final BinaryOperator<R> binaryOperator;
        private final Set<Integer> lostSegments = new ConcurrentHashSet();
        R currentValue;

        ResultsAccumulator(BinaryOperator<R> binaryOperator) {
            this.binaryOperator = binaryOperator;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Set<Integer> onIntermediateResult(Address address, R results) {
            if (results != null) {
                ResultsAccumulator resultsAccumulator = this;
                synchronized (resultsAccumulator) {
                    this.currentValue = this.currentValue != null ? this.binaryOperator.apply(this.currentValue, results) : results;
                }
            }
            return null;
        }

        @Override
        public void onCompletion(Address address, Set<Integer> completedSegments, R results) {
            this.onIntermediateResult(address, results);
        }

        @Override
        public void onSegmentsLost(Set<Integer> segments) {
            for (Integer segment : segments) {
                this.lostSegments.add(segment);
            }
        }
    }

    class KeyTrackingConsumer<K>
    implements ClusterStreamManager.ResultsCallback<Collection<K>>,
    KeyTrackingTerminalOperation.IntermediateCollector<Collection<K>> {
        final KeyPartitioner keyPartitioner;
        final Set<Integer> lostSegments = new ConcurrentHashSet();
        final AtomicReferenceArray<Set<K>> referenceArray;

        KeyTrackingConsumer(KeyPartitioner keyPartitioner, int numSegments) {
            this.keyPartitioner = keyPartitioner;
            this.referenceArray = new AtomicReferenceArray(numSegments);
            for (int i = 0; i < this.referenceArray.length(); ++i) {
                this.referenceArray.set(i, new HashSet());
            }
        }

        @Override
        public Set<Integer> onIntermediateResult(Address address, Collection<K> results) {
            if (results != null) {
                AbstractCacheStream.this.getLog().tracef("Response from %s with results %s", address, results.size());
                results.forEach(key -> {
                    int segment = this.keyPartitioner.getSegment(key);
                    Set<K> keys = this.referenceArray.get(segment);
                    if (keys != null) {
                        keys.add(key);
                    }
                });
            }
            return null;
        }

        @Override
        public void onCompletion(Address address, Set<Integer> completedSegments, Collection<K> results) {
            if (!completedSegments.isEmpty()) {
                AbstractCacheStream.this.getLog().tracef("Completing segments %s", completedSegments);
                completedSegments.forEach(s -> this.referenceArray.set((int)s, (Set<K>)null));
            } else {
                AbstractCacheStream.this.getLog().tracef("No segments to complete from %s", address);
            }
            this.onIntermediateResult(address, results);
        }

        @Override
        public void onSegmentsLost(Set<Integer> segments) {
            for (Integer segment : segments) {
                this.lostSegments.add(segment);
            }
        }

        @Override
        public void sendDataResonse(Collection<K> response) {
            this.onIntermediateResult((Address)null, response);
        }
    }

    static class AtomicReferenceArrayToMap<R>
    extends AbstractMap<Integer, R> {
        final AtomicReferenceArray<R> array;

        AtomicReferenceArrayToMap(AtomicReferenceArray<R> array) {
            this.array = array;
        }

        @Override
        public boolean containsKey(Object o) {
            if (!(o instanceof Integer)) {
                return false;
            }
            int i = (Integer)o;
            return 0 <= i && i < this.array.length();
        }

        @Override
        public R get(Object key) {
            if (!(key instanceof Integer)) {
                return null;
            }
            int i = (Integer)key;
            if (0 <= i && i < this.array.length()) {
                return this.array.get(i);
            }
            return null;
        }

        @Override
        public int size() {
            return this.array.length();
        }

        @Override
        public boolean remove(Object key, Object value) {
            throw new UnsupportedOperationException();
        }

        @Override
        public void clear() {
            throw new UnsupportedOperationException();
        }

        @Override
        public Set<Map.Entry<Integer, R>> entrySet() {
            throw new UnsupportedOperationException();
        }
    }
}

