/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.artifact;

import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.jobmanagement.v1.LegacyArtifactStagingServiceGrpc;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.MessageOrBuilder;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.util.JsonFormat;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hasher;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractLegacyArtifactStagingService
extends LegacyArtifactStagingServiceGrpc.LegacyArtifactStagingServiceImplBase
implements FnService {
    public static final String NO_ARTIFACTS_STAGED_TOKEN = (String)ArtifactApi.CommitManifestResponse.Constants.NO_ARTIFACTS_STAGED_TOKEN.getValueDescriptor().getOptions().getExtension(RunnerApi.beamConstant);
    private static final Logger LOG = LoggerFactory.getLogger(AbstractLegacyArtifactStagingService.class);
    private static final Charset CHARSET = StandardCharsets.UTF_8;

    public abstract String getArtifactUri(String var1, String var2) throws Exception;

    public abstract WritableByteChannel openUri(String var1) throws IOException;

    public abstract void removeUri(String var1) throws IOException;

    public abstract WritableByteChannel openManifest(String var1) throws Exception;

    public abstract void removeArtifacts(String var1) throws Exception;

    public abstract String getRetrievalToken(String var1) throws Exception;

    public StreamObserver<ArtifactApi.PutArtifactRequest> putArtifact(StreamObserver<ArtifactApi.PutArtifactResponse> responseObserver) {
        return new PutArtifactStreamObserver(responseObserver);
    }

    public void commitManifest(ArtifactApi.CommitManifestRequest request, StreamObserver<ArtifactApi.CommitManifestResponse> responseObserver) {
        try {
            String retrievalToken;
            if (request.getManifest().getArtifactCount() > 0) {
                String stagingSessionToken = request.getStagingSessionToken();
                ArtifactApi.ProxyManifest.Builder proxyManifestBuilder = ArtifactApi.ProxyManifest.newBuilder().setManifest(request.getManifest());
                for (ArtifactApi.ArtifactMetadata artifactMetadata : request.getManifest().getArtifactList()) {
                    proxyManifestBuilder.addLocation(ArtifactApi.ProxyManifest.Location.newBuilder().setName(artifactMetadata.getName()).setUri(this.getArtifactUri(stagingSessionToken, this.encodedFileName(artifactMetadata))).build());
                }
                try (WritableByteChannel manifestWritableByteChannel = this.openManifest(stagingSessionToken);){
                    manifestWritableByteChannel.write(CHARSET.encode(JsonFormat.printer().print((MessageOrBuilder)proxyManifestBuilder.build())));
                }
                retrievalToken = this.getRetrievalToken(stagingSessionToken);
            } else {
                retrievalToken = NO_ARTIFACTS_STAGED_TOKEN;
            }
            responseObserver.onNext((Object)ArtifactApi.CommitManifestResponse.newBuilder().setRetrievalToken(retrievalToken).build());
            responseObserver.onCompleted();
        }
        catch (Exception e) {
            LOG.error("Unable to commit manifest.", (Throwable)e);
            responseObserver.onError((Throwable)e);
        }
    }

    @Override
    public void close() throws Exception {
    }

    private String encodedFileName(ArtifactApi.ArtifactMetadata artifactMetadata) {
        return "artifact_" + Hashing.sha256().hashString((CharSequence)artifactMetadata.getName(), CHARSET).toString();
    }

    private class PutArtifactStreamObserver
    implements StreamObserver<ArtifactApi.PutArtifactRequest> {
        private final StreamObserver<ArtifactApi.PutArtifactResponse> outboundObserver;
        private ArtifactApi.PutArtifactMetadata metadata;
        private String artifactId;
        private WritableByteChannel artifactWritableByteChannel;
        private Hasher hasher;

        PutArtifactStreamObserver(StreamObserver<ArtifactApi.PutArtifactResponse> outboundObserver) {
            this.outboundObserver = outboundObserver;
        }

        public void onNext(ArtifactApi.PutArtifactRequest putArtifactRequest) {
            if (this.metadata == null) {
                Preconditions.checkNotNull((Object)putArtifactRequest);
                Preconditions.checkNotNull((Object)putArtifactRequest.getMetadata());
                this.metadata = putArtifactRequest.getMetadata();
                LOG.debug("stored metadata: {}", (Object)this.metadata);
                try {
                    this.artifactId = AbstractLegacyArtifactStagingService.this.getArtifactUri(putArtifactRequest.getMetadata().getStagingSessionToken(), AbstractLegacyArtifactStagingService.this.encodedFileName(this.metadata.getMetadata()));
                    LOG.debug("Going to stage artifact {} to {}.", (Object)this.metadata.getMetadata().getName(), (Object)this.artifactId);
                    this.artifactWritableByteChannel = AbstractLegacyArtifactStagingService.this.openUri(this.artifactId);
                    this.hasher = Hashing.sha256().newHasher();
                }
                catch (Exception e) {
                    String message = String.format("Failed to begin staging artifact %s", this.metadata.getMetadata().getName());
                    LOG.error(message, (Throwable)e);
                    this.outboundObserver.onError((Throwable)new StatusRuntimeException(Status.DATA_LOSS.withDescription(message).withCause((Throwable)e)));
                }
            } else {
                try {
                    ByteString data = putArtifactRequest.getData().getData();
                    this.artifactWritableByteChannel.write(data.asReadOnlyByteBuffer());
                    this.hasher.putBytes(data.toByteArray());
                }
                catch (IOException e) {
                    String message = String.format("Failed to write chunk of artifact %s to %s", this.metadata.getMetadata().getName(), this.artifactId);
                    LOG.error(message, (Throwable)e);
                    this.outboundObserver.onError((Throwable)new StatusRuntimeException(Status.DATA_LOSS.withDescription(message).withCause((Throwable)e)));
                }
            }
        }

        public void onError(Throwable throwable) {
            LOG.error("Staging artifact failed for " + this.artifactId, throwable);
            try {
                if (this.artifactWritableByteChannel != null) {
                    this.artifactWritableByteChannel.close();
                }
                if (this.artifactId != null) {
                    AbstractLegacyArtifactStagingService.this.removeUri(this.artifactId);
                }
            }
            catch (IOException e) {
                this.outboundObserver.onError((Throwable)new StatusRuntimeException(Status.DATA_LOSS.withDescription(String.format("Failed to clean up artifact file %s", this.artifactId))));
                return;
            }
            this.outboundObserver.onError((Throwable)new StatusRuntimeException(Status.DATA_LOSS.withDescription(String.format("Failed to stage artifact %s", this.artifactId)).withCause(throwable)));
        }

        public void onCompleted() {
            String actualSha256;
            String expectedSha256;
            LOG.debug("Staging artifact completed for " + this.artifactId);
            if (this.artifactWritableByteChannel != null) {
                try {
                    this.artifactWritableByteChannel.close();
                }
                catch (IOException e) {
                    this.onError(e);
                    return;
                }
            }
            if ((expectedSha256 = this.metadata.getMetadata().getSha256()) != null && !expectedSha256.isEmpty() && !(actualSha256 = this.hasher.hash().toString()).equals(expectedSha256)) {
                this.outboundObserver.onError((Throwable)new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(String.format("Artifact %s is corrupt: expected sah256 %s, but has sha256 %s", this.metadata.getMetadata().getName(), expectedSha256, actualSha256))));
                return;
            }
            this.outboundObserver.onNext((Object)ArtifactApi.PutArtifactResponse.newBuilder().build());
            this.outboundObserver.onCompleted();
        }
    }
}

