/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.interceptors.impl;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.infinispan.AdvancedCache;
import org.infinispan.CacheSet;
import org.infinispan.InternalCacheSet;
import org.infinispan.commands.AbstractTopologyAffectedCommand;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.DataCommand;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.TopologyAffectedCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.functional.ReadOnlyKeyCommand;
import org.infinispan.commands.functional.ReadOnlyManyCommand;
import org.infinispan.commands.functional.ReadWriteKeyCommand;
import org.infinispan.commands.functional.ReadWriteKeyValueCommand;
import org.infinispan.commands.functional.ReadWriteManyCommand;
import org.infinispan.commands.functional.ReadWriteManyEntriesCommand;
import org.infinispan.commands.functional.WriteOnlyKeyCommand;
import org.infinispan.commands.functional.WriteOnlyKeyValueCommand;
import org.infinispan.commands.functional.WriteOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand;
import org.infinispan.commands.read.EntrySetCommand;
import org.infinispan.commands.read.GetAllCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.read.KeySetCommand;
import org.infinispan.commands.remote.ClusteredGetAllCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.write.ComputeCommand;
import org.infinispan.commands.write.ComputeIfAbsentCommand;
import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.commands.write.IracPutKeyValueCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.logging.Log;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.container.entries.RemoteMetadata;
import org.infinispan.container.impl.EntryFactory;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.container.versioning.InequalVersionComparisonResult;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.interceptors.InvocationStage;
import org.infinispan.interceptors.InvocationSuccessFunction;
import org.infinispan.metadata.Metadata;
import org.infinispan.metadata.impl.InternalMetadataImpl;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.MapResponseCollector;
import org.infinispan.scattered.ScatteredVersionManager;
import org.infinispan.statetransfer.OutdatedTopologyException;
import org.reactivestreams.Publisher;

public class PrefetchInterceptor<K, V>
extends DDAsyncInterceptor {
    protected static final Log log = LogFactory.getLog(PrefetchInterceptor.class);
    protected static final long STATE_TRANSFER_FLAGS = FlagBitSets.PUT_FOR_STATE_TRANSFER | FlagBitSets.CACHE_MODE_LOCAL | FlagBitSets.IGNORE_RETURN_VALUES | FlagBitSets.SKIP_REMOTE_LOOKUP | FlagBitSets.SKIP_SHARED_CACHE_STORE | FlagBitSets.SKIP_OWNERSHIP_CHECK | FlagBitSets.SKIP_XSITE_BACKUP;
    @Inject
    protected ScatteredVersionManager<K> svm;
    @Inject
    protected DistributionManager dm;
    @Inject
    protected KeyPartitioner keyPartitioner;
    @Inject
    protected CommandsFactory commandsFactory;
    @Inject
    protected RpcManager rpcManager;
    @Inject
    protected ComponentRef<AdvancedCache<K, V>> cache;
    @Inject
    protected EntryFactory entryFactory;
    @Inject
    protected InternalDataContainer<K, V> dataContainer;
    protected int numSegments;
    private final InvocationSuccessFunction<VisitableCommand> handleRemotelyPrefetchedEntry = this::handleRemotelyPrefetchedEntry;

    @Start
    public void start() {
        this.numSegments = this.cacheConfiguration.clustering().hash().numSegments();
    }

    private boolean canRetrieveRemoteValue(FlagAffectedCommand command) {
        return !command.hasAnyFlag(FlagBitSets.SKIP_OWNERSHIP_CHECK);
    }

    protected Object handleReadCommand(InvocationContext ctx, DataCommand command) throws Throwable {
        if (command.hasAnyFlag(FlagBitSets.COMMAND_RETRY)) {
            ctx.removeLookedUpEntry(command.getKey());
        }
        if (this.canRetrieveRemoteValue(command)) {
            return this.prefetchKeyIfNeededAndInvokeNext(ctx, command, command.getKey(), false);
        }
        return this.invokeNext(ctx, command);
    }

    private Object prefetchKeyIfNeededAndInvokeNext(InvocationContext ctx, DataCommand command, Object key, boolean isWrite) {
        int segment = command.getSegment();
        switch (this.svm.getSegmentState(segment)) {
            case NOT_OWNED: {
                break;
            }
            case BLOCKED: {
                if (isWrite) {
                    return PrefetchInterceptor.asyncValue(this.svm.getBlockingFuture(segment)).thenApply(ctx, command, (rCtx, rCommand, ignored) -> this.prefetchKeyIfNeededAndInvokeNext(ctx, command, key, true));
                }
            }
            case KEY_TRANSFER: 
            case VALUE_TRANSFER: {
                InvocationStage nextStage = this.lookupLocalAndRetrieveRemote(ctx, key, command, segment);
                if (nextStage == null) break;
                return this.asyncInvokeNext(ctx, (VisitableCommand)command, nextStage);
            }
            case OWNED: {
                break;
            }
            default: {
                throw new IllegalStateException();
            }
        }
        return this.invokeNext(ctx, command);
    }

    private <C extends VisitableCommand & TopologyAffectedCommand> Object prefetchKeysIfNeededAndInvokeNext(InvocationContext ctx, C command, Collection<?> keys, boolean isWrite) {
        BitSet blockedSegments = null;
        ArrayList transferedKeys = null;
        block6: for (Object key : keys) {
            int segment = this.keyPartitioner.getSegment(key);
            switch (this.svm.getSegmentState(segment)) {
                case NOT_OWNED: {
                    continue block6;
                }
                case BLOCKED: {
                    if (isWrite) {
                        if (blockedSegments == null) {
                            blockedSegments = new BitSet(this.numSegments);
                        }
                        blockedSegments.set(segment);
                    }
                }
                case KEY_TRANSFER: 
                case VALUE_TRANSFER: {
                    if (transferedKeys == null) {
                        transferedKeys = new ArrayList(keys.size());
                    }
                    transferedKeys.add(key);
                }
                case OWNED: {
                    continue block6;
                }
            }
            throw new IllegalStateException();
        }
        if (blockedSegments != null) {
            CompletableFuture<Void> blockingFuture = CompletableFuture.allOf((CompletableFuture[])blockedSegments.stream().mapToObj(this.svm::getBlockingFuture).toArray(CompletableFuture[]::new));
            return PrefetchInterceptor.asyncValue(blockingFuture).thenApply(ctx, command, (rCtx, rCommand, rv) -> this.prefetchKeysIfNeededAndInvokeNext(rCtx, rCommand, keys, true));
        }
        if (transferedKeys != null) {
            return this.asyncInvokeNext(ctx, command, this.retrieveRemoteValues(ctx, command, transferedKeys));
        }
        return this.invokeNext(ctx, command);
    }

    private InvocationStage lookupLocalAndRetrieveRemote(InvocationContext ctx, Object key, DataCommand cmd, int segment) {
        Metadata metadata;
        InternalCacheEntry<K, V> entry = this.dataContainer.peek(segment, key);
        if (log.isTraceEnabled()) {
            log.tracef("Locally prefetched entry %s", entry);
        }
        Metadata metadata2 = metadata = entry != null ? entry.getMetadata() : null;
        if (metadata != null && metadata.version() != null && this.svm.isVersionActual(segment, metadata.version())) {
            this.entryFactory.wrapExternalEntry(ctx, key, entry, true, true);
            return null;
        }
        if (metadata instanceof RemoteMetadata && this.svm.getSegmentState(segment) == ScatteredVersionManager.SegmentState.VALUE_TRANSFER) {
            Address backup = ((RemoteMetadata)metadata).getAddress();
            return this.retrieveRemoteValue(ctx, Collections.singleton(backup), key, segment, cmd);
        }
        return this.retrieveRemoteValue(ctx, null, key, segment, cmd);
    }

    private InvocationStage retrieveRemoteValue(InvocationContext ctx, Collection<Address> targets, Object key, int segment, DataCommand dataCommand) {
        if (log.isTraceEnabled()) {
            log.tracef("Prefetching entry for key %s from %s", key, targets);
        }
        ClusteredGetCommand command = this.commandsFactory.buildClusteredGetCommand(key, segment, FlagBitSets.SKIP_OWNERSHIP_CHECK);
        command.setTopologyId(dataCommand.getTopologyId());
        CompletionStage<Map<Address, Response>> remoteInvocation = targets != null ? this.rpcManager.invokeCommand(targets, (ReplicableCommand)command, MapResponseCollector.ignoreLeavers(targets.size()), this.rpcManager.getSyncRpcOptions()) : this.rpcManager.invokeCommandOnAll(command, MapResponseCollector.ignoreLeavers(), this.rpcManager.getSyncRpcOptions());
        return PrefetchInterceptor.asyncValue(remoteInvocation).thenApplyMakeStage(ctx, dataCommand, this.handleRemotelyPrefetchedEntry);
    }

    private Object handleRemotelyPrefetchedEntry(InvocationContext ctx, VisitableCommand command, Object rv) {
        Map responseMap = (Map)rv;
        EntryVersion maxVersion = null;
        InternalCacheValue maxValue = null;
        for (Response response : responseMap.values()) {
            if (!response.isSuccessful()) {
                throw OutdatedTopologyException.RETRY_NEXT_TOPOLOGY;
            }
            SuccessfulResponse successfulResponse = (SuccessfulResponse)response;
            InternalCacheValue icv = (InternalCacheValue)successfulResponse.getResponseValue();
            if (icv == null) continue;
            Metadata metadata = icv.getMetadata();
            if (metadata instanceof RemoteMetadata) {
                throw OutdatedTopologyException.RETRY_NEXT_TOPOLOGY;
            }
            if (metadata == null || metadata.version() == null || maxVersion != null && maxVersion.compareTo(metadata.version()) != InequalVersionComparisonResult.BEFORE) continue;
            maxVersion = metadata.version();
            maxValue = icv;
        }
        if (log.isTraceEnabled()) {
            log.tracef("Prefetched value is %s", maxValue);
        }
        DataCommand dataCommand = (DataCommand)command;
        if (maxValue == null) {
            return null;
        }
        this.entryFactory.wrapExternalEntry(ctx, dataCommand.getKey(), maxValue.toInternalCacheEntry(dataCommand.getKey()), true, true);
        PutKeyValueCommand putKeyValueCommand = this.commandsFactory.buildPutKeyValueCommand(dataCommand.getKey(), maxValue.getValue(), dataCommand.getSegment(), new InternalMetadataImpl(maxValue), STATE_TRANSFER_FLAGS);
        putKeyValueCommand.setTopologyId(dataCommand.getTopologyId());
        return this.invokeNext(ctx, putKeyValueCommand);
    }

    private <C extends VisitableCommand & TopologyAffectedCommand> InvocationStage retrieveRemoteValues(InvocationContext ctx, C originCommand, List<?> keys) {
        if (log.isTraceEnabled()) {
            log.tracef("Prefetching entries for keys %s using broadcast", keys);
        }
        ClusteredGetAllCommand command = this.commandsFactory.buildClusteredGetAllCommand(keys, FlagBitSets.SKIP_OWNERSHIP_CHECK, null);
        command.setTopologyId(((TopologyAffectedCommand)originCommand).getTopologyId());
        CompletionStage<Map<Address, Response>> rpcFuture = this.rpcManager.invokeCommandOnAll(command, MapResponseCollector.ignoreLeavers(), this.rpcManager.getSyncRpcOptions());
        return PrefetchInterceptor.asyncValue(rpcFuture).thenApplyMakeStage(ctx, originCommand, (rCtx, topologyAffectedCommand, rv) -> {
            Map responseMap = (Map)rv;
            InternalCacheValue[] maxValues = new InternalCacheValue[keys.size()];
            for (Response response : responseMap.values()) {
                if (!response.isSuccessful()) {
                    throw OutdatedTopologyException.RETRY_NEXT_TOPOLOGY;
                }
                InternalCacheValue[] values = (InternalCacheValue[])((SuccessfulResponse)response).getResponseValue();
                int i = 0;
                for (InternalCacheValue icv : values) {
                    if (icv != null) {
                        Metadata maxMetadata;
                        Metadata metadata = icv.getMetadata();
                        if (metadata instanceof RemoteMetadata) {
                            throw OutdatedTopologyException.RETRY_NEXT_TOPOLOGY;
                        }
                        if (maxValues[i] == null) {
                            maxValues[i] = icv;
                        } else if (metadata != null && metadata.version() != null && ((maxMetadata = maxValues[i].getMetadata()) == null || maxMetadata.version() == null || maxMetadata.version().compareTo(metadata.version()) == InequalVersionComparisonResult.BEFORE)) {
                            maxValues[i] = icv;
                        }
                    }
                    ++i;
                }
            }
            HashMap map = new HashMap(keys.size());
            for (int i = 0; i < maxValues.length; ++i) {
                if (maxValues[i] == null) continue;
                map.put(keys.get(i), maxValues[i]);
            }
            if (log.isTraceEnabled()) {
                log.tracef("Prefetched values are %s", map);
            }
            if (map.isEmpty()) {
                return CompletableFutures.completedNull();
            }
            for (Map.Entry entry : map.entrySet()) {
                this.entryFactory.wrapExternalEntry(rCtx, entry.getKey(), ((InternalCacheValue)entry.getValue()).toInternalCacheEntry(entry.getKey()), true, true);
            }
            PutMapCommand putMapCommand = this.commandsFactory.buildPutMapCommand(map, null, STATE_TRANSFER_FLAGS);
            putMapCommand.setTopologyId(((TopologyAffectedCommand)((Object)topologyAffectedCommand)).getTopologyId());
            return this.invokeNext(rCtx, putMapCommand);
        });
    }

    protected Object handleReadManyCommand(InvocationContext ctx, AbstractTopologyAffectedCommand command, Collection<?> keys) throws Throwable {
        if (command.hasAnyFlag(FlagBitSets.COMMAND_RETRY)) {
            ctx.removeLookedUpEntries(keys);
        }
        if (this.canRetrieveRemoteValue(command)) {
            return this.prefetchKeysIfNeededAndInvokeNext(ctx, command, keys, false);
        }
        return this.invokeNext(ctx, command);
    }

    protected Object handleWriteCommand(InvocationContext ctx, DataWriteCommand command) {
        if (command.hasAnyFlag(FlagBitSets.COMMAND_RETRY)) {
            ctx.removeLookedUpEntry(command.getKey());
        }
        if (command.loadType() != VisitableCommand.LoadType.DONT_LOAD && this.canRetrieveRemoteValue(command)) {
            return this.prefetchKeyIfNeededAndInvokeNext(ctx, command, command.getKey(), true);
        }
        return this.invokeNext(ctx, command);
    }

    protected Object handleWriteManyCommand(InvocationContext ctx, WriteCommand command) throws Throwable {
        if (command.hasAnyFlag(FlagBitSets.COMMAND_RETRY)) {
            ctx.removeLookedUpEntries(command.getAffectedKeys());
        }
        if (command.loadType() != VisitableCommand.LoadType.DONT_LOAD && this.canRetrieveRemoteValue(command)) {
            return this.prefetchKeysIfNeededAndInvokeNext(ctx, command, command.getAffectedKeys(), true);
        }
        return this.invokeNext(ctx, command);
    }

    @Override
    public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public Object visitIracPutKeyValueCommand(InvocationContext ctx, IracPutKeyValueCommand command) {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
        return this.handleReadCommand(ctx, command);
    }

    @Override
    public Object visitGetCacheEntryCommand(InvocationContext ctx, GetCacheEntryCommand command) throws Throwable {
        return this.handleReadCommand(ctx, command);
    }

    @Override
    public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) throws Throwable {
        return this.handleReadManyCommand(ctx, command, command.getKeys());
    }

    @Override
    public Object visitReadOnlyKeyCommand(InvocationContext ctx, ReadOnlyKeyCommand command) throws Throwable {
        return this.handleReadCommand(ctx, command);
    }

    @Override
    public Object visitReadOnlyManyCommand(InvocationContext ctx, ReadOnlyManyCommand command) throws Throwable {
        return this.handleReadManyCommand(ctx, command, command.getKeys());
    }

    @Override
    public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public Object visitComputeIfAbsentCommand(InvocationContext ctx, ComputeIfAbsentCommand command) throws Throwable {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public Object visitComputeCommand(InvocationContext ctx, ComputeCommand command) throws Throwable {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
        return this.handleWriteManyCommand(ctx, command);
    }

    @Override
    public Object visitWriteOnlyKeyCommand(InvocationContext ctx, WriteOnlyKeyCommand command) throws Throwable {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public Object visitReadWriteKeyValueCommand(InvocationContext ctx, ReadWriteKeyValueCommand command) throws Throwable {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public Object visitReadWriteKeyCommand(InvocationContext ctx, ReadWriteKeyCommand command) throws Throwable {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public Object visitWriteOnlyManyEntriesCommand(InvocationContext ctx, WriteOnlyManyEntriesCommand command) throws Throwable {
        return this.handleWriteManyCommand(ctx, command);
    }

    @Override
    public Object visitWriteOnlyKeyValueCommand(InvocationContext ctx, WriteOnlyKeyValueCommand command) throws Throwable {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public Object visitWriteOnlyManyCommand(InvocationContext ctx, WriteOnlyManyCommand command) throws Throwable {
        return this.handleWriteManyCommand(ctx, command);
    }

    @Override
    public Object visitReadWriteManyCommand(InvocationContext ctx, ReadWriteManyCommand command) throws Throwable {
        return this.handleWriteManyCommand(ctx, command);
    }

    @Override
    public Object visitReadWriteManyEntriesCommand(InvocationContext ctx, ReadWriteManyEntriesCommand command) throws Throwable {
        return this.handleWriteManyCommand(ctx, command);
    }

    @Override
    public Object visitKeySetCommand(InvocationContext ctx, KeySetCommand command) throws Throwable {
        EntrySetCommand entrySetCommand = this.commandsFactory.buildEntrySetCommand(command.getFlagsBitSet());
        return this.invokeNextThenApply(ctx, entrySetCommand, (rCtx, rCommand, rv) -> {
            boolean ignoreOwnership = rCommand.hasAnyFlag(FlagBitSets.SKIP_OWNERSHIP_CHECK | FlagBitSets.CACHE_MODE_LOCAL);
            return new BackingKeySet(ignoreOwnership, (CacheSet)rv);
        });
    }

    @Override
    public Object visitEntrySetCommand(InvocationContext ctx, EntrySetCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, (rCtx, rCommand, rv) -> {
            boolean ignoreOwnership = rCommand.hasAnyFlag(FlagBitSets.SKIP_OWNERSHIP_CHECK | FlagBitSets.CACHE_MODE_LOCAL | FlagBitSets.STATE_TRANSFER_PROGRESS);
            return new BackingEntrySet(ignoreOwnership, (CacheSet)rv);
        });
    }

    private Publisher<CacheEntry<K, V>> getPublisher(int segment, boolean ignoreOwnership, CacheSet<CacheEntry<K, V>> next) {
        if (ignoreOwnership) {
            return next.localPublisher(segment);
        }
        ScatteredVersionManager.SegmentState segmentState = this.svm.getSegmentState(segment);
        switch (segmentState) {
            case NOT_OWNED: {
                return Flowable.empty();
            }
            case OWNED: {
                return next.localPublisher(segment);
            }
            case BLOCKED: 
            case KEY_TRANSFER: 
            case VALUE_TRANSFER: {
                CompletionStage stage = this.svm.valuesFuture(this.dm.getCacheTopology().getTopologyId()).thenApply(__ -> next.localPublisher(segment));
                return Single.fromCompletionStage((CompletionStage)stage).flatMapPublisher(p -> p);
            }
        }
        throw new IllegalStateException();
    }

    private class BackingKeySet
    extends InternalCacheSet<K> {
        private final boolean ignoreOwnership;
        protected final CacheSet<CacheEntry<K, V>> next;

        public BackingKeySet(boolean ignoreOwnership, CacheSet<CacheEntry<K, V>> next) {
            this.ignoreOwnership = ignoreOwnership;
            this.next = next;
        }

        @Override
        public Publisher<K> localPublisher(IntSet segments) {
            return Flowable.fromIterable((Iterable)segments).concatMap(this::localPublisher);
        }

        @Override
        public Publisher<K> localPublisher(int segment) {
            return Flowable.fromPublisher(PrefetchInterceptor.this.getPublisher(segment, this.ignoreOwnership, this.next)).mapOptional(e -> e.getValue() != null ? Optional.of(e.getKey()) : Optional.empty());
        }
    }

    private class BackingEntrySet
    extends InternalCacheSet<CacheEntry<K, V>> {
        protected final CacheSet<CacheEntry<K, V>> next;
        private final boolean ignoreOwnership;

        public BackingEntrySet(boolean ignoreOwnership, CacheSet<CacheEntry<K, V>> next) {
            this.next = next;
            this.ignoreOwnership = ignoreOwnership;
        }

        @Override
        public Publisher<CacheEntry<K, V>> localPublisher(IntSet segments) {
            return Flowable.fromIterable((Iterable)segments).concatMap(this::localPublisher);
        }

        @Override
        public Publisher<CacheEntry<K, V>> localPublisher(int segment) {
            return Flowable.fromPublisher(PrefetchInterceptor.this.getPublisher(segment, this.ignoreOwnership, this.next)).filter(e -> e.getValue() != null);
        }
    }
}

