/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.aws.dynamodb;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.aws.AwsClientFactories;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.LockManagers;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.RequestLimitExceededException;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.TransactionConflictException;

public class DynamoDbLockManager
extends LockManagers.BaseLockManager {
    private static final Logger LOG = LoggerFactory.getLogger(DynamoDbLockManager.class);
    private static final String COL_LOCK_ENTITY_ID = "entityId";
    private static final String COL_LEASE_DURATION_MS = "leaseDurationMs";
    private static final String COL_VERSION = "version";
    private static final String COL_LOCK_OWNER_ID = "ownerId";
    private static final String CONDITION_LOCK_ID_MATCH = String.format("%s = :eid AND %s = :oid", "entityId", "ownerId");
    private static final String CONDITION_LOCK_ENTITY_NOT_EXIST = String.format("attribute_not_exists(%s)", "entityId");
    private static final String CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH = String.format("attribute_not_exists(%s) OR (%s = :eid AND %s = :vid)", "entityId", "entityId", "version");
    private static final int LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
    private static final int RELEASE_RETRY_ATTEMPTS_MAX = 5;
    private static final List<KeySchemaElement> LOCK_TABLE_SCHEMA = Lists.newArrayList((KeySchemaElement)KeySchemaElement.builder().attributeName("entityId").keyType(KeyType.HASH).build());
    private static final List<AttributeDefinition> LOCK_TABLE_COL_DEFINITIONS = Lists.newArrayList((AttributeDefinition)AttributeDefinition.builder().attributeName("entityId").attributeType(ScalarAttributeType.S).build());
    private final Map<String, DynamoDbHeartbeat> heartbeats = Maps.newHashMap();
    private DynamoDbClient dynamo;
    private String lockTableName;

    public DynamoDbLockManager() {
    }

    public DynamoDbLockManager(DynamoDbClient dynamo, String lockTableName) {
        super.initialize(Maps.newHashMap());
        this.dynamo = dynamo;
        this.lockTableName = lockTableName;
        this.ensureLockTableExistsOrCreate();
    }

    private void ensureLockTableExistsOrCreate() {
        if (this.tableExists(this.lockTableName)) {
            return;
        }
        LOG.info("Dynamo lock table {} not found, trying to create", (Object)this.lockTableName);
        this.dynamo.createTable((CreateTableRequest)CreateTableRequest.builder().tableName(this.lockTableName).keySchema(DynamoDbLockManager.lockTableSchema()).attributeDefinitions(DynamoDbLockManager.lockTableColDefinitions()).billingMode(BillingMode.PAY_PER_REQUEST).build());
        Tasks.foreach(this.lockTableName).retry(5).throwFailureWhenFinished().onlyRetryOn((Class<Exception>)IllegalStateException.class).run(this::checkTableActive);
    }

    @VisibleForTesting
    boolean tableExists(String tableName) {
        try {
            this.dynamo.describeTable((DescribeTableRequest)DescribeTableRequest.builder().tableName(tableName).build());
            return true;
        }
        catch (ResourceNotFoundException e) {
            return false;
        }
    }

    private void checkTableActive(String tableName) {
        try {
            DescribeTableResponse response = this.dynamo.describeTable((DescribeTableRequest)DescribeTableRequest.builder().tableName(tableName).build());
            TableStatus currentStatus = response.table().tableStatus();
            if (!currentStatus.equals((Object)TableStatus.ACTIVE)) {
                throw new IllegalStateException(String.format("Dynamo table %s is not active, current status: %s", tableName, currentStatus));
            }
        }
        catch (ResourceNotFoundException e) {
            throw new IllegalStateException(String.format("Cannot find Dynamo table %s", tableName));
        }
    }

    @Override
    public void initialize(Map<String, String> properties) {
        super.initialize(properties);
        this.dynamo = AwsClientFactories.from(properties).dynamo();
        this.lockTableName = properties.get("lock.table");
        Preconditions.checkNotNull(this.lockTableName, "DynamoDB lock table name must not be null");
        this.ensureLockTableExistsOrCreate();
    }

    @Override
    public boolean acquire(String entityId, String ownerId) {
        try {
            Tasks.foreach(entityId).throwFailureWhenFinished().retry(0x7FFFFFFE).exponentialBackoff(this.acquireIntervalMs(), this.acquireIntervalMs(), this.acquireTimeoutMs(), 1.0).onlyRetryOn(ConditionalCheckFailedException.class, ProvisionedThroughputExceededException.class, TransactionConflictException.class, RequestLimitExceededException.class, InternalServerErrorException.class).run(id -> this.acquireOnce((String)id, ownerId));
            return true;
        }
        catch (DynamoDbException e) {
            return false;
        }
    }

    @VisibleForTesting
    void acquireOnce(String entityId, String ownerId) {
        GetItemResponse response = this.dynamo.getItem((GetItemRequest)GetItemRequest.builder().tableName(this.lockTableName).consistentRead(Boolean.valueOf(true)).key(DynamoDbLockManager.toKey(entityId)).build());
        if (!response.hasItem()) {
            this.dynamo.putItem((PutItemRequest)PutItemRequest.builder().tableName(this.lockTableName).item(DynamoDbLockManager.toNewItem(entityId, ownerId, this.heartbeatTimeoutMs())).conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST).build());
        } else {
            Map currentItem = response.item();
            try {
                Thread.sleep(Long.parseLong(((AttributeValue)currentItem.get(COL_LEASE_DURATION_MS)).n()));
            }
            catch (InterruptedException e) {
                throw new IllegalStateException(String.format("Fail to acquire lock %s by %s, interrupted during sleep", entityId, ownerId), e);
            }
            this.dynamo.putItem((PutItemRequest)PutItemRequest.builder().tableName(this.lockTableName).item(DynamoDbLockManager.toNewItem(entityId, ownerId, this.heartbeatTimeoutMs())).conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH).expressionAttributeValues(ImmutableMap.of(":eid", (AttributeValue)AttributeValue.builder().s(entityId).build(), ":vid", (AttributeValue)AttributeValue.builder().s(((AttributeValue)currentItem.get(COL_VERSION)).s()).build())).build());
        }
        this.startNewHeartbeat(entityId, ownerId);
    }

    private void startNewHeartbeat(String entityId, String ownerId) {
        if (this.heartbeats.containsKey(entityId)) {
            this.heartbeats.remove(entityId).cancel();
        }
        DynamoDbHeartbeat heartbeat = new DynamoDbHeartbeat(this.dynamo, this.lockTableName, this.heartbeatIntervalMs(), this.heartbeatTimeoutMs(), entityId, ownerId);
        heartbeat.schedule(this.scheduler());
        this.heartbeats.put(entityId, heartbeat);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean release(String entityId, String ownerId) {
        boolean succeeded = false;
        DynamoDbHeartbeat heartbeat = this.heartbeats.get(entityId);
        try {
            Tasks.foreach(entityId).retry(5).throwFailureWhenFinished().onlyRetryOn(ProvisionedThroughputExceededException.class, TransactionConflictException.class, RequestLimitExceededException.class, InternalServerErrorException.class).run(id -> this.dynamo.deleteItem((DeleteItemRequest)DeleteItemRequest.builder().tableName(this.lockTableName).key(DynamoDbLockManager.toKey(id)).conditionExpression(CONDITION_LOCK_ID_MATCH).expressionAttributeValues(DynamoDbLockManager.toLockIdValues(id, ownerId)).build()));
            succeeded = true;
        }
        catch (ConditionalCheckFailedException e) {
            LOG.error("Failed to release lock for entity: {}, owner: {}, lock entity does not exist or owner not match", new Object[]{entityId, ownerId, e});
        }
        catch (DynamoDbException e) {
            LOG.error("Failed to release lock for entity: {}, owner: {}, encountered unexpected DynamoDB exception", new Object[]{entityId, ownerId, e});
        }
        finally {
            if (heartbeat != null && heartbeat.ownerId().equals(ownerId)) {
                heartbeat.cancel();
            }
        }
        return succeeded;
    }

    private static Map<String, AttributeValue> toKey(String entityId) {
        return ImmutableMap.of(COL_LOCK_ENTITY_ID, (AttributeValue)AttributeValue.builder().s(entityId).build());
    }

    private static Map<String, AttributeValue> toNewItem(String entityId, String ownerId, long heartbeatTimeoutMs) {
        return ImmutableMap.of(COL_LOCK_ENTITY_ID, (AttributeValue)AttributeValue.builder().s(entityId).build(), COL_LOCK_OWNER_ID, (AttributeValue)AttributeValue.builder().s(ownerId).build(), COL_VERSION, (AttributeValue)AttributeValue.builder().s(UUID.randomUUID().toString()).build(), COL_LEASE_DURATION_MS, (AttributeValue)AttributeValue.builder().n(Long.toString(heartbeatTimeoutMs)).build());
    }

    private static Map<String, AttributeValue> toLockIdValues(String entityId, String ownerId) {
        return ImmutableMap.of(":eid", (AttributeValue)AttributeValue.builder().s(entityId).build(), ":oid", (AttributeValue)AttributeValue.builder().s(ownerId).build());
    }

    @Override
    public void close() {
        this.dynamo.close();
        this.heartbeats.values().forEach(DynamoDbHeartbeat::cancel);
        this.heartbeats.clear();
    }

    public static List<KeySchemaElement> lockTableSchema() {
        return LOCK_TABLE_SCHEMA;
    }

    public static List<AttributeDefinition> lockTableColDefinitions() {
        return LOCK_TABLE_COL_DEFINITIONS;
    }

    private static class DynamoDbHeartbeat
    implements Runnable {
        private final DynamoDbClient dynamo;
        private final String lockTableName;
        private final long intervalMs;
        private final long timeoutMs;
        private final String entityId;
        private final String ownerId;
        private ScheduledFuture<?> future;

        DynamoDbHeartbeat(DynamoDbClient dynamo, String lockTableName, long intervalMs, long timeoutMs, String entityId, String ownerId) {
            this.dynamo = dynamo;
            this.lockTableName = lockTableName;
            this.intervalMs = intervalMs;
            this.timeoutMs = timeoutMs;
            this.entityId = entityId;
            this.ownerId = ownerId;
            this.future = null;
        }

        @Override
        public void run() {
            try {
                this.dynamo.putItem((PutItemRequest)PutItemRequest.builder().tableName(this.lockTableName).item(DynamoDbLockManager.toNewItem(this.entityId, this.ownerId, this.timeoutMs)).conditionExpression(CONDITION_LOCK_ID_MATCH).expressionAttributeValues(DynamoDbLockManager.toLockIdValues(this.entityId, this.ownerId)).build());
            }
            catch (ConditionalCheckFailedException e) {
                LOG.error("Fail to heartbeat for entity: {}, owner: {} due to conditional check failure, unsafe concurrent commits might be going on", new Object[]{this.entityId, this.ownerId, e});
            }
            catch (RuntimeException e) {
                LOG.error("Failed to heartbeat for entity: {}, owner: {}", new Object[]{this.entityId, this.ownerId, e});
            }
        }

        public String ownerId() {
            return this.ownerId;
        }

        public void schedule(ScheduledExecutorService scheduler) {
            this.future = scheduler.scheduleAtFixedRate(this, 0L, this.intervalMs, TimeUnit.MILLISECONDS);
        }

        public void cancel() {
            if (this.future != null) {
                this.future.cancel(false);
            }
        }
    }
}

