/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.azure.eventhub;

import com.azure.core.amqp.AmqpClientOptions;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.core.credential.TokenCredential;
import com.azure.core.http.ProxyOptions;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.HttpClientOptions;
import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventBatchContext;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.migration.ProxyServiceMigration;
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKey;
import org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStrategy;
import org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStore;
import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.ComponentStateCheckpointStoreException;
import org.apache.nifi.processors.azure.eventhub.position.EarliestEventPositionProvider;
import org.apache.nifi.processors.azure.eventhub.position.LegacyBlobStorageEventPositionProvider;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.shared.azure.eventhubs.BlobStorageAuthenticationStrategy;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;

@Tags(value={"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@CapabilityDescription(value="Receives messages from Microsoft Azure Event Hubs with checkpointing to ensure consistent event processing. Checkpoint tracking avoids consuming a message multiple times and enables reliable resumption of processing in the event of intermittent network failures. Checkpoint tracking requires external storage and provides the preferred approach to consuming messages from Azure Event Hubs. In clustered environment, ConsumeAzureEventHub processor instances form a consumer group and the messages are distributed among the cluster nodes (each message is processed on one cluster node only).")
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Stateful(scopes={Scope.LOCAL, Scope.CLUSTER}, description="Local state is used to store the client id. Cluster state is used to store partition ownership and checkpoint information when component state is configured as the checkpointing strategy.")
@TriggerSerially
@WritesAttributes(value={@WritesAttribute(attribute="eventhub.enqueued.timestamp", description="The time (in milliseconds since epoch, UTC) at which the message was enqueued in the event hub"), @WritesAttribute(attribute="eventhub.offset", description="The offset into the partition at which the message was stored"), @WritesAttribute(attribute="eventhub.sequence", description="The sequence number associated with the message"), @WritesAttribute(attribute="eventhub.name", description="The name of the event hub from which the message was pulled"), @WritesAttribute(attribute="eventhub.partition", description="The name of the partition from which the message was pulled"), @WritesAttribute(attribute="eventhub.property.*", description="The application properties of this message. IE: 'application' would be 'eventhub.property.application'")})
public class ConsumeAzureEventHub
extends AbstractSessionFactoryProcessor
implements AzureEventHubComponent {
    private static final Pattern SAS_TOKEN_PATTERN = Pattern.compile("^\\?.*$");
    private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.%s";
    private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN = "BlobEndpoint=https://%s.blob.core.%s/;SharedAccessSignature=%s";
    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder().name("Event Hub Namespace").description("The namespace that the Azure Event Hubs is assigned to. This is generally equal to <Event Hub Names>-ns.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(true).build();
    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder().name("Event Hub Name").description("The name of the event hub to pull messages from.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(true).build();
    static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
    static final PropertyDescriptor AUTHENTICATION_STRATEGY = AzureEventHubComponent.AUTHENTICATION_STRATEGY;
    static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER = AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER;
    static final PropertyDescriptor ACCESS_POLICY_NAME = new PropertyDescriptor.Builder().name("Shared Access Policy Name").description("The name of the shared access policy. This policy must have Listen claims.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(false).dependsOn(AUTHENTICATION_STRATEGY, (DescribedValue)AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE, new DescribedValue[0]).build();
    static final PropertyDescriptor POLICY_PRIMARY_KEY = AzureEventHubUtils.POLICY_PRIMARY_KEY;
    static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder().name("Consumer Group").description("The name of the consumer group to use.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).defaultValue("$Default").required(true).build();
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("Record Reader").description("The Record Reader to use for reading received messages. The event hub name can be referred by Expression Language '${eventhub.name}' to access a schema.").identifiesControllerService(RecordReaderFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("Record Writer").description("The Record Writer to use for serializing Records to an output FlowFile. The event hub name can be referred by Expression Language '${eventhub.name}' to access a schema. If not specified, each message will create a FlowFile.").identifiesControllerService(RecordSetWriterFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final AllowableValue INITIAL_OFFSET_START_OF_STREAM = new AllowableValue("start-of-stream", "Start of stream", "Read from the oldest message retained in the stream.");
    static final AllowableValue INITIAL_OFFSET_END_OF_STREAM = new AllowableValue("end-of-stream", "End of stream", "Ignore old retained messages even if exist, start reading new ones from now.");
    static final PropertyDescriptor INITIAL_OFFSET = new PropertyDescriptor.Builder().name("Initial Offset").description("Specify where to start receiving messages if offset is not yet stored in the checkpoint store.").required(true).allowableValues(new DescribedValue[]{INITIAL_OFFSET_START_OF_STREAM, INITIAL_OFFSET_END_OF_STREAM}).defaultValue((DescribedValue)INITIAL_OFFSET_END_OF_STREAM).build();
    static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder().name("Prefetch Count").defaultValue("The number of messages to fetch from the event hub before processing. This parameter affects throughput. The more prefetch count, the better throughput in general, but consumes more resources (RAM). NOTE: Even though the event hub client API provides this option, actual number of messages can be pre-fetched is depend on the Event Hubs server implementation. It is reported that only one event is received at a time in certain situation. https://github.com/Azure/azure-event-hubs-java/issues/125").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("300").expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(true).build();
    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").description("The number of messages to process within a NiFi session. This parameter affects throughput and consistency. NiFi commits its session and Event Hubs checkpoints after processing this number of messages. If NiFi session is committed, but fails to create an Event Hubs checkpoint, then it is possible that the same messages will be received again. The higher number, the higher throughput, but possibly less consistent.").defaultValue("10").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(true).build();
    static final PropertyDescriptor RECEIVE_TIMEOUT = new PropertyDescriptor.Builder().name("Message Receive Timeout").description("The amount of time this consumer should wait to receive the Batch Size before returning.").defaultValue("1 min").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(true).build();
    static final PropertyDescriptor CHECKPOINT_STRATEGY = new PropertyDescriptor.Builder().name("Checkpoint Strategy").expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).allowableValues(CheckpointStrategy.class).defaultValue(CheckpointStrategy.AZURE_BLOB_STORAGE.getValue()).description("Specifies which strategy to use for storing and retrieving partition ownership and checkpoint information for each partition.").build();
    static final PropertyDescriptor STORAGE_ACCOUNT_NAME = new PropertyDescriptor.Builder().name("Storage Account Name").description("Name of the Azure Storage account to store event hub consumer group state.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(true).dependsOn(CHECKPOINT_STRATEGY, (DescribedValue)CheckpointStrategy.AZURE_BLOB_STORAGE, new DescribedValue[0]).build();
    static final PropertyDescriptor BLOB_STORAGE_AUTHENTICATION_STRATEGY = new PropertyDescriptor.Builder().name("Blob Storage Authentication Strategy").description("Authentication strategy used to access Azure Blob Storage when persisting checkpoints.").allowableValues(BlobStorageAuthenticationStrategy.class).defaultValue(BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getValue()).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).dependsOn(CHECKPOINT_STRATEGY, (DescribedValue)CheckpointStrategy.AZURE_BLOB_STORAGE, new DescribedValue[0]).build();
    static final PropertyDescriptor BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER = new PropertyDescriptor.Builder().name("Storage Access Token Provider").description("Controller Service providing OAuth2 Access Tokens for authenticating to Azure Blob Storage when persisting checkpoints.").identifiesControllerService(OAuth2AccessTokenProvider.class).required(true).dependsOn(BLOB_STORAGE_AUTHENTICATION_STRATEGY, (DescribedValue)BlobStorageAuthenticationStrategy.OAUTH2, new DescribedValue[0]).build();
    static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new PropertyDescriptor.Builder().name("Storage Account Key").description("The Azure Storage account key to store event hub consumer group state.").sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(false).dependsOn(BLOB_STORAGE_AUTHENTICATION_STRATEGY, (DescribedValue)BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY, new DescribedValue[0]).build();
    static final PropertyDescriptor STORAGE_SAS_TOKEN = new PropertyDescriptor.Builder().name("Storage SAS Token").description("The Azure Storage SAS token to store Event Hub consumer group state. Always starts with a ? character.").sensitive(true).addValidator(StandardValidators.createRegexMatchingValidator((Pattern)SAS_TOKEN_PATTERN, (boolean)true, (String)"Token must start with a ? character.")).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(false).dependsOn(BLOB_STORAGE_AUTHENTICATION_STRATEGY, (DescribedValue)BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE, new DescribedValue[0]).build();
    static final PropertyDescriptor STORAGE_CONTAINER_NAME = new PropertyDescriptor.Builder().name("Storage Container Name").description("Name of the Azure Storage container to store the event hub consumer group state. If not specified, event hub name is used.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(false).dependsOn(CHECKPOINT_STRATEGY, (DescribedValue)CheckpointStrategy.AZURE_BLOB_STORAGE, new DescribedValue[0]).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles received from Event Hub.").build();
    static final Relationship REL_PARSE_FAILURE = new Relationship.Builder().name("parse.failure").description("If a message from event hub cannot be parsed using the configured Record Reader or failed to be written by the configured Record Writer, the contents of the message will be routed to this Relationship as its own individual FlowFile.").build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
    private static final Set<Relationship> RECORD_RELATIONSHIPS = Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(NAMESPACE, EVENT_HUB_NAME, SERVICE_BUS_ENDPOINT, TRANSPORT_TYPE, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, AUTHENTICATION_STRATEGY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, CONSUMER_GROUP, RECORD_READER, RECORD_WRITER, INITIAL_OFFSET, PREFETCH_COUNT, BATCH_SIZE, RECEIVE_TIMEOUT, CHECKPOINT_STRATEGY, STORAGE_ACCOUNT_NAME, STORAGE_CONTAINER_NAME, BLOB_STORAGE_AUTHENTICATION_STRATEGY, STORAGE_ACCOUNT_KEY, STORAGE_SAS_TOKEN, BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER, PROXY_CONFIGURATION_SERVICE);
    private volatile ProcessSessionFactory processSessionFactory;
    private volatile EventProcessorClient eventProcessorClient;
    private volatile RecordReaderFactory readerFactory;
    private volatile RecordSetWriterFactory writerFactory;
    private volatile boolean isRecordReaderSet = false;
    private volatile boolean isRecordWriterSet = false;
    private volatile String clientId;
    protected final Consumer<EventBatchContext> eventBatchProcessor = eventBatchContext -> {
        ProcessSession session = this.processSessionFactory.createSession();
        try {
            StopWatch stopWatch = new StopWatch(true);
            if (this.readerFactory == null || this.writerFactory == null) {
                this.writeFlowFiles((EventBatchContext)eventBatchContext, session, stopWatch);
            } else {
                this.writeRecords((EventBatchContext)eventBatchContext, session, stopWatch);
            }
            session.commitAsync(() -> ((EventBatchContext)eventBatchContext).updateCheckpoint());
        }
        catch (Exception e) {
            PartitionContext partitionContext = eventBatchContext.getPartitionContext();
            this.getLogger().error("Event Batch processing failed Namespace [{}] Event Hub [{}] Consumer Group [{}] Partition [{}]", new Object[]{partitionContext.getFullyQualifiedNamespace(), partitionContext.getEventHubName(), partitionContext.getConsumerGroup(), partitionContext.getPartitionId(), e});
            session.rollback();
        }
    };
    private final Consumer<ErrorContext> errorProcessor = errorContext -> {
        AmqpException amqpException;
        PartitionContext partitionContext = errorContext.getPartitionContext();
        Throwable throwable = errorContext.getThrowable();
        if (throwable instanceof AmqpException && (amqpException = (AmqpException)throwable).getErrorCondition() == AmqpErrorCondition.LINK_STOLEN) {
            this.getLogger().info("Partition was stolen by another consumer instance from the consumer group. Namespace [{}] Event Hub [{}] Consumer Group [{}] Partition [{}]. {}", new Object[]{partitionContext.getFullyQualifiedNamespace(), partitionContext.getEventHubName(), partitionContext.getConsumerGroup(), partitionContext.getPartitionId(), amqpException.getMessage()});
            return;
        }
        String errorMessage = throwable instanceof ComponentStateCheckpointStoreException ? "Failed to access Component State Checkpoint Store" : "Receive Events failed";
        this.getLogger().error("{}. Namespace [{}] Event Hub [{}] Consumer Group [{}] Partition [{}]", new Object[]{errorMessage, partitionContext.getFullyQualifiedNamespace(), partitionContext.getEventHubName(), partitionContext.getConsumerGroup(), partitionContext.getPartitionId(), throwable});
    };

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return this.isRecordReaderSet && this.isRecordWriterSet ? RECORD_RELATIONSHIPS : RELATIONSHIPS;
    }

    public void migrateProperties(PropertyConfiguration config) {
        boolean sharedAccessCredentialsConfigured;
        config.removeProperty("event-hub-consumer-hostname");
        config.renameProperty("event-hub-namespace", NAMESPACE.getName());
        config.renameProperty("event-hub-name", EVENT_HUB_NAME.getName());
        config.renameProperty("event-hub-shared-access-policy-name", ACCESS_POLICY_NAME.getName());
        config.renameProperty("event-hub-consumer-group", CONSUMER_GROUP.getName());
        config.renameProperty("record-reader", RECORD_READER.getName());
        config.renameProperty("record-writer", RECORD_WRITER.getName());
        config.renameProperty("event-hub-initial-offset", INITIAL_OFFSET.getName());
        config.renameProperty("event-hub-prefetch-count", PREFETCH_COUNT.getName());
        config.renameProperty("event-hub-batch-size", BATCH_SIZE.getName());
        config.renameProperty("event-hub-message-receive-timeout", RECEIVE_TIMEOUT.getName());
        config.renameProperty("checkpoint-strategy", CHECKPOINT_STRATEGY.getName());
        config.renameProperty("storage-account-name", STORAGE_ACCOUNT_NAME.getName());
        config.renameProperty("storage-account-key", STORAGE_ACCOUNT_KEY.getName());
        config.renameProperty("storage-sas-token", STORAGE_SAS_TOKEN.getName());
        config.renameProperty("storage-container-name", STORAGE_CONTAINER_NAME.getName());
        config.renameProperty("event-hub-shared-access-policy-primary-key", POLICY_PRIMARY_KEY.getName());
        config.renameProperty("use-managed-identity", "Use Azure Managed Identity");
        Optional<String> blobAuthenticationStrategyValue = config.getRawPropertyValue(BLOB_STORAGE_AUTHENTICATION_STRATEGY.getName()).map(String::trim).filter(StringUtils::isNotBlank);
        boolean blobAuthenticationStrategyMissing = blobAuthenticationStrategyValue.isEmpty();
        boolean storageAccountKeySet = this.hasConfiguredValue(config, STORAGE_ACCOUNT_KEY);
        boolean storageSasTokenSet = this.hasConfiguredValue(config, STORAGE_SAS_TOKEN);
        if (blobAuthenticationStrategyMissing) {
            String blobStorageAuthenticationStrategyValue = storageSasTokenSet && !storageAccountKeySet ? BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue() : BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getValue();
            config.setProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY.getName(), blobStorageAuthenticationStrategyValue);
        }
        Optional<String> authenticationStrategyValue = config.getRawPropertyValue(AUTHENTICATION_STRATEGY.getName()).map(String::trim).filter(StringUtils::isNotBlank);
        boolean authenticationStrategyMissing = authenticationStrategyValue.isEmpty();
        boolean legacyManagedIdentityPropertyPresent = config.hasProperty("Use Azure Managed Identity");
        boolean bl = sharedAccessCredentialsConfigured = this.hasConfiguredValue(config, ACCESS_POLICY_NAME) || this.hasConfiguredValue(config, POLICY_PRIMARY_KEY);
        if (authenticationStrategyMissing || legacyManagedIdentityPropertyPresent) {
            boolean useManagedIdentity = config.getPropertyValue("Use Azure Managed Identity").map(Boolean::parseBoolean).orElse(!sharedAccessCredentialsConfigured);
            String derivedAuthenticationStrategy = useManagedIdentity ? AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue() : AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue();
            config.setProperty(AUTHENTICATION_STRATEGY.getName(), derivedAuthenticationStrategy);
        }
        config.removeProperty("Use Azure Managed Identity");
        ProxyServiceMigration.renameProxyConfigurationServiceProperty((PropertyConfiguration)config);
    }

    private boolean hasConfiguredValue(PropertyConfiguration config, PropertyDescriptor descriptor) {
        return config.getPropertyValue(descriptor.getName()).filter(StringUtils::isNotBlank).isPresent();
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        ControllerService recordReader = validationContext.getProperty(RECORD_READER).asControllerService();
        ControllerService recordWriter = validationContext.getProperty(RECORD_WRITER).asControllerService();
        String storageAccountKey = validationContext.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
        String storageSasToken = validationContext.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
        CheckpointStrategy checkpointStrategy = CheckpointStrategy.valueOf(validationContext.getProperty(CHECKPOINT_STRATEGY).getValue());
        boolean blobOauthProviderSet = validationContext.getProperty(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER).isSet();
        if (recordReader != null && recordWriter == null || recordReader == null && recordWriter != null) {
            results.add(new ValidationResult.Builder().subject("Record Reader and Writer").explanation("Both %s and %s should be set in order to write FlowFiles as Records.".formatted(RECORD_READER.getDisplayName(), RECORD_WRITER.getDisplayName())).valid(false).build());
        }
        if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) {
            BlobStorageAuthenticationStrategy blobStorageAuthenticationStrategy = (BlobStorageAuthenticationStrategy)validationContext.getProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY).asAllowableValue(BlobStorageAuthenticationStrategy.class);
            if (blobStorageAuthenticationStrategy == BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY) {
                if (StringUtils.isBlank((String)storageAccountKey)) {
                    results.add(new ValidationResult.Builder().subject(STORAGE_ACCOUNT_KEY.getDisplayName()).explanation("%s must be set when %s is %s.".formatted(STORAGE_ACCOUNT_KEY.getDisplayName(), BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(), BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getDisplayName())).valid(false).build());
                }
                if (StringUtils.isNotBlank((String)storageSasToken)) {
                    results.add(new ValidationResult.Builder().subject(STORAGE_SAS_TOKEN.getDisplayName()).explanation("%s must not be set when %s is %s.".formatted(STORAGE_SAS_TOKEN.getDisplayName(), BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(), BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getDisplayName())).valid(false).build());
                }
            } else if (blobStorageAuthenticationStrategy == BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE) {
                if (StringUtils.isBlank((String)storageSasToken)) {
                    results.add(new ValidationResult.Builder().subject(STORAGE_SAS_TOKEN.getDisplayName()).explanation("%s must be set when %s is %s.".formatted(STORAGE_SAS_TOKEN.getDisplayName(), BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(), BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName())).valid(false).build());
                }
                if (StringUtils.isNotBlank((String)storageAccountKey)) {
                    results.add(new ValidationResult.Builder().subject(STORAGE_ACCOUNT_KEY.getDisplayName()).explanation("%s must not be set when %s is %s.".formatted(STORAGE_ACCOUNT_KEY.getDisplayName(), BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(), BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName())).valid(false).build());
                }
            } else if (blobStorageAuthenticationStrategy == BlobStorageAuthenticationStrategy.OAUTH2) {
                if (!blobOauthProviderSet) {
                    results.add(new ValidationResult.Builder().subject(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER.getDisplayName()).explanation("%s must be set when %s is %s.".formatted(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER.getDisplayName(), BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(), BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName())).valid(false).build());
                }
                if (StringUtils.isNotBlank((String)storageAccountKey)) {
                    results.add(new ValidationResult.Builder().subject(STORAGE_ACCOUNT_KEY.getDisplayName()).explanation("%s must not be set when %s is %s.".formatted(STORAGE_ACCOUNT_KEY.getDisplayName(), BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(), BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName())).valid(false).build());
                }
                if (StringUtils.isNotBlank((String)storageSasToken)) {
                    results.add(new ValidationResult.Builder().subject(STORAGE_SAS_TOKEN.getDisplayName()).explanation("%s must not be set when %s is %s.".formatted(STORAGE_SAS_TOKEN.getDisplayName(), BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(), BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName())).valid(false).build());
                }
            }
        }
        results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, validationContext));
        return results;
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (RECORD_READER.equals((Object)descriptor)) {
            this.isRecordReaderSet = StringUtils.isNotEmpty((String)newValue);
        } else if (RECORD_WRITER.equals((Object)descriptor)) {
            this.isRecordWriterSet = StringUtils.isNotEmpty((String)newValue);
        }
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws IOException {
        StateManager stateManager = context.getStateManager();
        String clientId = stateManager.getState(Scope.LOCAL).get(CheckpointStoreKey.CLIENT_ID.key());
        if (clientId == null) {
            clientId = UUID.randomUUID().toString();
            HashMap<String, String> clientState = new HashMap<String, String>();
            clientState.put(CheckpointStoreKey.CLIENT_ID.key(), clientId);
            clientState.put(CheckpointStoreKey.CLUSTERED.key(), Boolean.toString(this.getNodeTypeProvider().isClustered()));
            stateManager.setState(clientState, Scope.LOCAL);
        }
        this.clientId = clientId;
    }

    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) {
        if (this.eventProcessorClient == null) {
            this.processSessionFactory = sessionFactory;
            this.readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
            this.writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
            this.eventProcessorClient = this.createClient(context);
            this.eventProcessorClient.start();
        }
        context.yield();
    }

    @OnStopped
    public void stopClient() {
        if (this.eventProcessorClient != null) {
            try {
                this.eventProcessorClient.stop();
            }
            catch (Exception e) {
                this.getLogger().warn("Event Processor Client stop failed", (Throwable)e);
            }
            this.eventProcessorClient = null;
            this.processSessionFactory = null;
            this.readerFactory = null;
            this.writerFactory = null;
            this.clientId = null;
        }
    }

    protected EventProcessorClient createClient(ProcessContext context) {
        Map<Object, Object> legacyPartitionEventPosition;
        ComponentStateCheckpointStore checkpointStore;
        String eventHubNamespace = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
        String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
        String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
        String consumerGroup = context.getProperty(CONSUMER_GROUP).evaluateAttributeExpressions().getValue();
        String fullyQualifiedNamespace = String.format("%s%s", eventHubNamespace, serviceBusEndpoint);
        CheckpointStrategy checkpointStrategy = CheckpointStrategy.valueOf(context.getProperty(CHECKPOINT_STRATEGY).getValue());
        if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) {
            String containerName = (String)org.apache.commons.lang3.StringUtils.defaultIfBlank((CharSequence)context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), (CharSequence)eventHubName);
            String storageAccountName = context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
            String domainName = this.getStorageDomainName(serviceBusEndpoint);
            BlobStorageAuthenticationStrategy blobStorageAuthenticationStrategy = (BlobStorageAuthenticationStrategy)context.getProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY).asAllowableValue(BlobStorageAuthenticationStrategy.class);
            BlobContainerClientBuilder blobContainerClientBuilder = new BlobContainerClientBuilder();
            switch (blobStorageAuthenticationStrategy) {
                case STORAGE_ACCOUNT_KEY: 
                case SHARED_ACCESS_SIGNATURE: {
                    String storageConnectionString = this.createStorageConnectionString(context, blobStorageAuthenticationStrategy, storageAccountName, domainName);
                    blobContainerClientBuilder.connectionString(storageConnectionString);
                    break;
                }
                case OAUTH2: {
                    OAuth2AccessTokenProvider tokenProvider = (OAuth2AccessTokenProvider)context.getProperty(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
                    TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider);
                    String endpoint = this.createBlobEndpoint(storageAccountName, domainName);
                    blobContainerClientBuilder.endpoint(endpoint);
                    blobContainerClientBuilder.credential(tokenCredential);
                }
            }
            blobContainerClientBuilder.containerName(containerName);
            ProxyOptions storageProxyOptions = AzureStorageUtils.getProxyOptions((PropertyContext)context);
            if (storageProxyOptions != null) {
                blobContainerClientBuilder.clientOptions((ClientOptions)new HttpClientOptions().setProxyOptions(storageProxyOptions));
            }
            BlobContainerAsyncClient blobContainerAsyncClient = blobContainerClientBuilder.buildAsyncClient();
            checkpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
            legacyPartitionEventPosition = this.getLegacyPartitionEventPosition(blobContainerAsyncClient, consumerGroup);
        } else {
            ComponentStateCheckpointStore componentStateCheckpointStore = new ComponentStateCheckpointStore(this.clientId, context.getStateManager());
            componentStateCheckpointStore.cleanUp(fullyQualifiedNamespace, eventHubName, consumerGroup);
            checkpointStore = componentStateCheckpointStore;
            legacyPartitionEventPosition = Collections.emptyMap();
        }
        Long receiveTimeout = context.getProperty(RECEIVE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
        Duration maxWaitTime = Duration.ofMillis(receiveTimeout);
        Integer maxBatchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
        AmqpTransportType transportType = ((AzureEventHubTransportType)context.getProperty(TRANSPORT_TYPE).asAllowableValue(AzureEventHubTransportType.class)).asAmqpTransportType();
        EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder().transportType(transportType).consumerGroup(consumerGroup).clientOptions((ClientOptions)new AmqpClientOptions().setIdentifier(this.clientId)).trackLastEnqueuedEventProperties(true).checkpointStore((CheckpointStore)checkpointStore).processError(this.errorProcessor).processEventBatch(this.eventBatchProcessor, maxBatchSize.intValue(), maxWaitTime);
        AzureEventHubAuthenticationStrategy configuredAuthenticationStrategy = (AzureEventHubAuthenticationStrategy)context.getProperty(AUTHENTICATION_STRATEGY).asAllowableValue(AzureEventHubAuthenticationStrategy.class);
        AzureEventHubAuthenticationStrategy authenticationStrategy = configuredAuthenticationStrategy == null ? AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY : configuredAuthenticationStrategy;
        switch (authenticationStrategy) {
            case MANAGED_IDENTITY: {
                ManagedIdentityCredentialBuilder managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
                ManagedIdentityCredential managedIdentityCredential = managedIdentityCredentialBuilder.build();
                eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, (TokenCredential)managedIdentityCredential);
                break;
            }
            case SHARED_ACCESS_SIGNATURE: {
                String policyName = context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue();
                String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
                AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
                eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
                break;
            }
            case OAUTH2: {
                OAuth2AccessTokenProvider tokenProvider = (OAuth2AccessTokenProvider)context.getProperty(EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
                TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider);
                eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential);
            }
        }
        Integer prefetchCount = context.getProperty(PREFETCH_COUNT).evaluateAttributeExpressions().asInteger();
        if (prefetchCount != null && prefetchCount > 0) {
            eventProcessorClientBuilder.prefetchCount(prefetchCount.intValue());
        }
        if (legacyPartitionEventPosition.isEmpty()) {
            String initialOffset = context.getProperty(INITIAL_OFFSET).getValue();
            if (INITIAL_OFFSET_START_OF_STREAM.getValue().equals(initialOffset)) {
                EarliestEventPositionProvider eventPositionProvider = new EarliestEventPositionProvider();
                Map<String, EventPosition> partitionEventPosition = eventPositionProvider.getInitialPartitionEventPosition();
                eventProcessorClientBuilder.initialPartitionEventPosition(partitionEventPosition);
            }
        } else {
            eventProcessorClientBuilder.initialPartitionEventPosition(legacyPartitionEventPosition);
        }
        AzureEventHubUtils.getProxyOptions((PropertyContext)context).ifPresent(arg_0 -> ((EventProcessorClientBuilder)eventProcessorClientBuilder).proxyOptions(arg_0));
        return eventProcessorClientBuilder.buildEventProcessorClient();
    }

    protected String getTransitUri(PartitionContext partitionContext) {
        return String.format("amqps://%s/%s/ConsumerGroups/%s/Partitions/%s", partitionContext.getFullyQualifiedNamespace(), partitionContext.getEventHubName(), partitionContext.getConsumerGroup(), partitionContext.getPartitionId());
    }

    private void putEventHubAttributes(Map<String, String> attributes, PartitionContext partitionContext, EventData eventData, LastEnqueuedEventProperties lastEnqueuedEventProperties) {
        if (lastEnqueuedEventProperties != null) {
            attributes.put("eventhub.enqueued.timestamp", String.valueOf(lastEnqueuedEventProperties.getEnqueuedTime()));
            attributes.put("eventhub.offset", lastEnqueuedEventProperties.getOffsetString());
            attributes.put("eventhub.sequence", String.valueOf(lastEnqueuedEventProperties.getSequenceNumber()));
        }
        Map<String, String> applicationProperties = AzureEventHubUtils.getApplicationProperties(eventData.getProperties());
        attributes.putAll(applicationProperties);
        attributes.put("eventhub.name", partitionContext.getEventHubName());
        attributes.put("eventhub.partition", partitionContext.getPartitionId());
    }

    private void writeFlowFiles(EventBatchContext eventBatchContext, ProcessSession session, StopWatch stopWatch) {
        PartitionContext partitionContext = eventBatchContext.getPartitionContext();
        List events = eventBatchContext.getEvents();
        events.forEach(eventData -> {
            HashMap<String, String> attributes = new HashMap<String, String>();
            this.putEventHubAttributes((Map<String, String>)attributes, partitionContext, (EventData)eventData, eventBatchContext.getLastEnqueuedEventProperties());
            FlowFile flowFile = session.create();
            flowFile = session.putAllAttributes(flowFile, attributes);
            byte[] body = eventData.getBody();
            flowFile = session.write(flowFile, outputStream -> outputStream.write(body));
            this.transferTo(REL_SUCCESS, session, stopWatch, partitionContext, flowFile);
        });
    }

    private void writeRecords(EventBatchContext eventBatchContext, ProcessSession session, StopWatch stopWatch) throws IOException {
        PartitionContext partitionContext = eventBatchContext.getPartitionContext();
        HashMap<String, String> schemaRetrievalVariables = new HashMap<String, String>();
        schemaRetrievalVariables.put("eventhub.name", partitionContext.getEventHubName());
        ComponentLog logger = this.getLogger();
        FlowFile flowFile = session.create();
        HashMap<String, String> attributes = new HashMap<String, String>();
        RecordSetWriter writer = null;
        EventData lastEventData = null;
        WriteResult lastWriteResult = null;
        int recordCount = 0;
        LastEnqueuedEventProperties lastEnqueuedEventProperties = eventBatchContext.getLastEnqueuedEventProperties();
        List events = eventBatchContext.getEvents();
        try (OutputStream out = session.write(flowFile);){
            for (EventData eventData : events) {
                byte[] eventDataBytes = eventData.getBody();
                try (ByteArrayInputStream in = new ByteArrayInputStream(eventDataBytes);){
                    Record record;
                    RecordReader reader = this.readerFactory.createRecordReader(schemaRetrievalVariables, (InputStream)in, (long)eventDataBytes.length, logger);
                    while ((record = reader.nextRecord()) != null) {
                        if (writer == null) {
                            RecordSchema readerSchema = record.getSchema();
                            RecordSchema writeSchema = this.writerFactory.getSchema(schemaRetrievalVariables, readerSchema);
                            writer = this.writerFactory.createWriter(logger, writeSchema, out, flowFile);
                            writer.beginRecordSet();
                        }
                        lastWriteResult = writer.write(record);
                        recordCount += lastWriteResult.getRecordCount();
                    }
                    lastEventData = eventData;
                }
                catch (Exception e) {
                    logger.error("Failed to parse message from Azure Event Hub using configured Record Reader and Writer", (Throwable)e);
                    FlowFile failed = session.create();
                    session.write(failed, o -> o.write(eventData.getBody()));
                    this.putEventHubAttributes(attributes, partitionContext, eventData, lastEnqueuedEventProperties);
                    failed = session.putAllAttributes(failed, attributes);
                    this.transferTo(REL_PARSE_FAILURE, session, stopWatch, partitionContext, failed);
                }
            }
            if (lastEventData != null) {
                this.putEventHubAttributes(attributes, partitionContext, lastEventData, lastEnqueuedEventProperties);
                attributes.put("record.count", String.valueOf(recordCount));
                if (writer != null) {
                    writer.finishRecordSet();
                    attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                    if (lastWriteResult != null) {
                        attributes.putAll(lastWriteResult.getAttributes());
                    }
                    try {
                        writer.close();
                    }
                    catch (IOException e) {
                        logger.warn("Failed to close Record Writer", (Throwable)e);
                    }
                }
            }
        }
        if (lastEventData == null) {
            session.remove(flowFile);
        } else {
            flowFile = session.putAllAttributes(flowFile, attributes);
            this.transferTo(REL_SUCCESS, session, stopWatch, partitionContext, flowFile);
        }
    }

    private void transferTo(Relationship relationship, ProcessSession session, StopWatch stopWatch, PartitionContext partitionContext, FlowFile flowFile) {
        session.transfer(flowFile, relationship);
        String transitUri = this.getTransitUri(partitionContext);
        session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    }

    private String createStorageConnectionString(ProcessContext context, BlobStorageAuthenticationStrategy blobStorageAuthenticationStrategy, String storageAccountName, String domainName) {
        String storageAccountKey = context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
        String storageSasToken = context.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
        return switch (blobStorageAuthenticationStrategy) {
            default -> throw new MatchException(null, null);
            case BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY -> String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY, storageAccountName, storageAccountKey, domainName);
            case BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE -> String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN, storageAccountName, domainName, storageSasToken);
            case BlobStorageAuthenticationStrategy.OAUTH2 -> throw new IllegalArgumentException(String.format("Blob Storage Authentication Strategy %s does not support connection string authentication", new Object[]{blobStorageAuthenticationStrategy}));
        };
    }

    private String createBlobEndpoint(String storageAccountName, String domainName) {
        return String.format("https://%s.blob.core.%s/", storageAccountName, domainName);
    }

    private String getStorageDomainName(String serviceBusEndpoint) {
        return serviceBusEndpoint.replace(".servicebus.", "");
    }

    private Map<String, EventPosition> getLegacyPartitionEventPosition(BlobContainerAsyncClient blobContainerAsyncClient, String consumerGroup) {
        LegacyBlobStorageEventPositionProvider legacyBlobStorageEventPositionProvider = new LegacyBlobStorageEventPositionProvider(blobContainerAsyncClient, consumerGroup);
        Map<String, EventPosition> partitionEventPosition = legacyBlobStorageEventPositionProvider.getInitialPartitionEventPosition();
        for (Map.Entry<String, EventPosition> partition : partitionEventPosition.entrySet()) {
            String partitionId = partition.getKey();
            EventPosition eventPosition = partition.getValue();
            this.getLogger().info("Loaded Event Position [{}] for Partition [{}] from Legacy Checkpoint Storage", new Object[]{eventPosition, partitionId});
        }
        return partitionEventPosition;
    }
}

