/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.coordinator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.util.SerializedValue;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.coordinator.CoordinationResponseUtils;
import org.apache.paimon.flink.sink.coordinator.LatestIdentifierRequest;
import org.apache.paimon.flink.sink.coordinator.LatestIdentifierResponse;
import org.apache.paimon.flink.sink.coordinator.PagedCoordinationRequest;
import org.apache.paimon.flink.sink.coordinator.PagedCoordinationResponse;
import org.apache.paimon.flink.sink.coordinator.ScanCoordinationRequest;
import org.apache.paimon.flink.sink.coordinator.ScanCoordinationResponse;
import org.apache.paimon.operation.RestoreFiles;
import org.apache.paimon.operation.WriteRestore;
import org.apache.paimon.utils.ArrayUtils;
import org.apache.paimon.utils.InstantiationUtil;
import org.apache.paimon.utils.SerializationUtils;

public class CoordinatedWriteRestore
implements WriteRestore {
    private final TaskOperatorEventGateway gateway;
    private final OperatorID operatorID;

    public CoordinatedWriteRestore(TaskOperatorEventGateway gateway, OperatorID operatorID) {
        this.gateway = gateway;
        this.operatorID = operatorID;
    }

    @Override
    public long latestCommittedIdentifier(String user) {
        LatestIdentifierRequest request = new LatestIdentifierRequest(user);
        try {
            SerializedValue serializedRequest = new SerializedValue((Object)request);
            LatestIdentifierResponse response = (LatestIdentifierResponse)CoordinationResponseUtils.unwrap((CoordinationResponse)this.gateway.sendRequestToCoordinator(this.operatorID, serializedRequest).get());
            return response.latestIdentifier();
        }
        catch (IOException | InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public RestoreFiles restoreFiles(BinaryRow partition, int bucket, boolean scanDynamicBucketIndex, boolean scanDeleteVectorsIndex) {
        ScanCoordinationRequest coordinationRequest = new ScanCoordinationRequest(SerializationUtils.serializeBinaryRow(partition), bucket, scanDynamicBucketIndex, scanDeleteVectorsIndex);
        try {
            PagedCoordinationResponse response;
            byte[] requestContent = InstantiationUtil.serializeObject(coordinationRequest);
            Integer nextPageToken = null;
            ArrayList<byte[]> result = new ArrayList<byte[]>();
            String uuid = UUID.randomUUID().toString();
            do {
                PagedCoordinationRequest request = new PagedCoordinationRequest(requestContent, uuid, nextPageToken);
                SerializedValue serializedRequest = new SerializedValue((Object)request);
                response = (PagedCoordinationResponse)CoordinationResponseUtils.unwrap((CoordinationResponse)this.gateway.sendRequestToCoordinator(this.operatorID, serializedRequest).get());
                result.add(response.content());
            } while ((nextPageToken = response.nextPageToken()) != null);
            byte[] responseContent = ArrayUtils.mergeByteArrays(result);
            ScanCoordinationResponse response2 = (ScanCoordinationResponse)InstantiationUtil.deserializeObject(responseContent, this.getClass().getClassLoader());
            return new RestoreFiles(response2.snapshot(), response2.totalBuckets(), response2.extractDataFiles(), response2.extractDynamicBucketIndex(), response2.extractDeletionVectorsIndex());
        }
        catch (IOException | ClassNotFoundException | InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}

