/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.azure.eventhub.checkpoint;

import com.azure.core.util.CoreUtils;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKey;
import org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKeyPrefix;
import org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils;
import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.ClusterNodeDisconnectedException;
import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.ConcurrentStateModificationException;
import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.StateNotAvailableException;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

public class ComponentStateCheckpointStore
implements CheckpointStore {
    private static final Logger LOGGER = LoggerFactory.getLogger(ComponentStateCheckpointStore.class);
    private final String clientId;
    private final StateManager stateManager;

    public ComponentStateCheckpointStore(String clientId, StateManager stateManager) {
        this.clientId = clientId;
        this.stateManager = stateManager;
    }

    public void cleanUp(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
        this.cleanUpMono(fullyQualifiedNamespace, eventHubName, consumerGroup).subscribe();
    }

    Mono<Void> cleanUpMono(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
        return this.getState().doFirst(() -> this.debug("cleanUp() -> Entering [{}, {}, {}]", fullyQualifiedNamespace, eventHubName, consumerGroup)).flatMap(oldState -> {
            Map<String, String> newMap = oldState.toMap().entrySet().stream().filter(e -> {
                String key = (String)e.getKey();
                if (!key.startsWith(CheckpointStoreKeyPrefix.OWNERSHIP.keyPrefix()) && !key.startsWith(CheckpointStoreKeyPrefix.CHECKPOINT.keyPrefix())) {
                    return true;
                }
                PartitionContext context = ComponentStateCheckpointStoreUtils.convertPartitionContext(key);
                return context.getFullyQualifiedNamespace().equalsIgnoreCase(fullyQualifiedNamespace) && context.getEventHubName().equalsIgnoreCase(eventHubName) && context.getConsumerGroup().equalsIgnoreCase(consumerGroup);
            }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            int removed = oldState.toMap().size() - newMap.size();
            if (removed > 0) {
                this.debug("cleanUp() -> Removed {} item(s)", removed);
                return this.updateState((StateMap)oldState, newMap);
            }
            this.debug("cleanUp() -> Nothing to clean up", new Object[0]);
            return Mono.empty();
        }).doOnSuccess(__ -> this.debug("cleanUp() -> Succeeded", new Object[0])).retryWhen(this.createRetrySpec("cleanUp")).doOnError(throwable -> this.debug("cleanUp() -> Failed: {}", throwable.getMessage()));
    }

    public Flux<PartitionOwnership> listOwnership(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
        return this.getState().doFirst(() -> this.debug("listOwnership() -> Entering [{}, {}, {}]", fullyQualifiedNamespace, eventHubName, consumerGroup)).flatMapMany(state -> {
            this.checkDisconnectedNode((StateMap)state);
            return this.getOwnerships((StateMap)state);
        }).filter(ownership -> ownership.getFullyQualifiedNamespace().equalsIgnoreCase(fullyQualifiedNamespace) && ownership.getEventHubName().equalsIgnoreCase(eventHubName) && ownership.getConsumerGroup().equalsIgnoreCase(consumerGroup)).doOnNext(partitionOwnership -> this.debug("listOwnership() -> Returning {}", ComponentStateCheckpointStoreUtils.ownershipToString(partitionOwnership))).doOnComplete(() -> this.debug("listOwnership() -> Succeeded", new Object[0])).doOnError(throwable -> this.debug("listOwnership() -> Failed: {}", throwable.getMessage()));
    }

    public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requestedPartitionOwnerships) {
        return this.getState().doFirst(() -> this.debug("claimOwnership() -> Entering [{}]", ComponentStateCheckpointStoreUtils.ownershipListToString(requestedPartitionOwnerships))).flatMapMany(oldState -> {
            this.checkDisconnectedNode((StateMap)oldState);
            HashMap<String, String> newMap = new HashMap<String, String>(oldState.toMap());
            ArrayList<PartitionOwnership> claimedOwnerships = new ArrayList<PartitionOwnership>();
            long timestamp = System.currentTimeMillis();
            for (PartitionOwnership requestedPartitionOwnership : requestedPartitionOwnerships) {
                String key = ComponentStateCheckpointStoreUtils.createOwnershipKey(requestedPartitionOwnership);
                if (oldState.get(key) != null) {
                    PartitionOwnership oldPartitionOwnership = ComponentStateCheckpointStoreUtils.convertOwnership(key, oldState.get(key));
                    String oldETag = oldPartitionOwnership.getETag();
                    String reqETag = requestedPartitionOwnership.getETag();
                    if (StringUtils.isNotEmpty((CharSequence)oldETag) && !oldETag.equals(reqETag)) {
                        this.debug("claimOwnership() -> Already claimed {}", ComponentStateCheckpointStoreUtils.ownershipToString(oldPartitionOwnership));
                        continue;
                    }
                }
                String newETag = CoreUtils.randomUuid().toString();
                PartitionOwnership partitionOwnership = new PartitionOwnership().setFullyQualifiedNamespace(requestedPartitionOwnership.getFullyQualifiedNamespace()).setEventHubName(requestedPartitionOwnership.getEventHubName()).setConsumerGroup(requestedPartitionOwnership.getConsumerGroup()).setPartitionId(requestedPartitionOwnership.getPartitionId()).setOwnerId(requestedPartitionOwnership.getOwnerId()).setLastModifiedTime(Long.valueOf(timestamp)).setETag(newETag);
                claimedOwnerships.add(partitionOwnership);
                newMap.put(key, ComponentStateCheckpointStoreUtils.createOwnershipValue(partitionOwnership));
                this.debug("claimOwnership() -> Claiming {}", ComponentStateCheckpointStoreUtils.ownershipToString(partitionOwnership));
            }
            if (claimedOwnerships.isEmpty()) {
                return Flux.empty();
            }
            return this.updateState((StateMap)oldState, (Map<String, String>)newMap).thenMany((Publisher)Flux.fromIterable(claimedOwnerships));
        }).doOnNext(partitionOwnership -> this.debug("claimOwnership() -> Returning {}", ComponentStateCheckpointStoreUtils.ownershipToString(partitionOwnership))).doOnComplete(() -> this.debug("claimOwnership() -> Succeeded", new Object[0])).retryWhen(this.createRetrySpec("claimOwnership")).doOnError(throwable -> this.debug("claimOwnership() -> Failed: {}", throwable.getMessage()));
    }

    public Flux<Checkpoint> listCheckpoints(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
        return this.getState().doFirst(() -> this.debug("listCheckpoints() -> Entering [{}, {}, {}]", fullyQualifiedNamespace, eventHubName, consumerGroup)).flatMapMany(state -> {
            this.checkDisconnectedNode((StateMap)state);
            return this.getCheckpoints((StateMap)state);
        }).filter(checkpoint -> checkpoint.getFullyQualifiedNamespace().equalsIgnoreCase(fullyQualifiedNamespace) && checkpoint.getEventHubName().equalsIgnoreCase(eventHubName) && checkpoint.getConsumerGroup().equalsIgnoreCase(consumerGroup)).doOnNext(checkpoint -> this.debug("listCheckpoints() -> Returning {}", ComponentStateCheckpointStoreUtils.checkpointToString(checkpoint))).doOnComplete(() -> this.debug("listCheckpoints() -> Succeeded", new Object[0])).doOnError(throwable -> this.debug("listCheckpoints() -> Failed: {}", throwable.getMessage()));
    }

    public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
        return this.getState().doFirst(() -> this.debug("updateCheckpoint() -> Entering [{}]", ComponentStateCheckpointStoreUtils.checkpointToString(checkpoint))).flatMap(oldState -> {
            this.checkDisconnectedNode((StateMap)oldState);
            HashMap<String, String> newMap = new HashMap<String, String>(oldState.toMap());
            newMap.put(ComponentStateCheckpointStoreUtils.createCheckpointKey(checkpoint), ComponentStateCheckpointStoreUtils.createCheckpointValue(checkpoint));
            return this.updateState((StateMap)oldState, (Map<String, String>)newMap);
        }).doOnSuccess(__ -> this.debug("updateCheckpoint() -> Succeeded", new Object[0])).retryWhen(this.createRetrySpec("updateCheckpoint")).doOnError(throwable -> this.debug("updateCheckpoint() -> Failed: {}", throwable.getMessage()));
    }

    private Retry createRetrySpec(String methodName) {
        return Retry.max((long)10L).filter(t -> t instanceof ConcurrentStateModificationException).doBeforeRetry(retrySignal -> this.debug(methodName + "() -> Retry: {}", retrySignal)).onRetryExhaustedThrow((retrySpec, retrySignal) -> new ConcurrentStateModificationException(String.format("Retrials of concurrent state modifications has been exhausted (%d retrials)", 10)));
    }

    private Flux<PartitionOwnership> getOwnerships(StateMap state) {
        return this.getEntries(state, CheckpointStoreKeyPrefix.OWNERSHIP.keyPrefix(), ComponentStateCheckpointStoreUtils::convertOwnership);
    }

    private Flux<Checkpoint> getCheckpoints(StateMap state) {
        return this.getEntries(state, CheckpointStoreKeyPrefix.CHECKPOINT.keyPrefix(), ComponentStateCheckpointStoreUtils::convertCheckpoint);
    }

    private <T> Flux<T> getEntries(StateMap state, String kind, BiFunction<String, String, T> converter) throws ProcessException {
        return state.toMap().entrySet().stream().filter(e -> ((String)e.getKey()).startsWith(kind)).map(e -> converter.apply((String)e.getKey(), (String)e.getValue())).collect(Collectors.collectingAndThen(Collectors.toList(), Flux::fromIterable));
    }

    private void checkDisconnectedNode(StateMap state) {
        boolean disconnectedNode = Boolean.parseBoolean(state.get(CheckpointStoreKey.CLUSTERED.key()));
        if (disconnectedNode) {
            throw new ClusterNodeDisconnectedException("The node has been disconnected from the cluster, the checkpoint store is not accessible");
        }
    }

    private void debug(String message, Object ... arguments) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("[clientId={}] {}", (Object)ArrayUtils.addFirst((Object[])arguments, (Object)this.clientId), (Object)message);
        }
    }

    private Mono<StateMap> getState() {
        return Mono.defer(() -> {
            try {
                StateMap state = this.stateManager.getState(Scope.CLUSTER);
                return Mono.just((Object)state);
            }
            catch (Exception e) {
                return Mono.error((Throwable)new StateNotAvailableException(e));
            }
        });
    }

    private Mono<Void> updateState(StateMap oldState, Map<String, String> newMap) {
        return Mono.defer(() -> {
            try {
                boolean success = this.stateManager.replace(oldState, newMap, Scope.CLUSTER);
                if (success) {
                    return Mono.empty();
                }
                return Mono.error((Throwable)new ConcurrentStateModificationException(String.format("Component state with version [%s] has been modified by another instance", oldState.getStateVersion().orElse("new"))));
            }
            catch (Exception e) {
                return Mono.error((Throwable)new StateNotAvailableException(e));
            }
        });
    }
}

