/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker.rest.api;

import com.google.protobuf.ByteString;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import lombok.Generated;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.auth.FunctionAuthData;
import org.apache.pulsar.functions.auth.FunctionAuthUtils;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionFilePackage;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.RestUtils;
import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.apache.pulsar.functions.worker.service.api.Sinks;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SinksImpl
extends ComponentImpl
implements Sinks<PulsarWorkerService> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SinksImpl.class);

    public SinksImpl(Supplier<PulsarWorkerService> workerServiceSupplier) {
        super(workerServiceSupplier, Function.FunctionDetails.ComponentType.SINK);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerSink(String tenant, String namespace, String sinkName, InputStream uploadedInputStream, FormDataContentDisposition fileDetail, String sinkPkgUrl, SinkConfig sinkConfig, AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        if (tenant == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
        }
        if (namespace == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
        }
        if (sinkName == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Sink name is not provided");
        }
        if (sinkConfig == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Sink config is not provided");
        }
        this.throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, sinkName, "register", authParams);
        try {
            this.worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
            String qualifiedNamespace = tenant + "/" + namespace;
            List namespaces = this.worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
            if (namespaces != null && !namespaces.contains(qualifiedNamespace)) {
                String qualifiedNamespaceWithCluster = String.format("%s/%s/%s", tenant, this.worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
                if (namespaces != null && !namespaces.contains(qualifiedNamespaceWithCluster)) {
                    log.error("{}/{}/{} Namespace {} does not exist", new Object[]{tenant, namespace, sinkName, namespace});
                    throw new RestException(Response.Status.BAD_REQUEST, "Namespace does not exist");
                }
            }
        }
        catch (PulsarAdminException.NotAuthorizedException e) {
            log.error("{}/{}/{} Client is not authorized to operate {} on tenant", new Object[]{tenant, namespace, sinkName, ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType)});
            throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
        }
        catch (PulsarAdminException.NotFoundException e) {
            log.error("{}/{}/{} Tenant {} does not exist", new Object[]{tenant, namespace, sinkName, tenant});
            throw new RestException(Response.Status.BAD_REQUEST, "Tenant does not exist");
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Issues getting tenant data", new Object[]{tenant, namespace, sinkName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (functionMetaDataManager.containsFunction(tenant, namespace, sinkName)) {
            log.error("{} {}/{}/{} already exists", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, sinkName});
            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s already exists", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), sinkName));
        }
        File componentPackageFile = null;
        try {
            Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
            Function.FunctionDetails functionDetails;
            try {
                if (org.apache.commons.lang3.StringUtils.isNotBlank((CharSequence)sinkPkgUrl)) {
                    componentPackageFile = this.getPackageFile(this.componentType, sinkPkgUrl);
                    functionDetails = this.validateUpdateRequestParams(tenant, namespace, sinkName, sinkConfig, componentPackageFile);
                } else {
                    if (uploadedInputStream != null) {
                        componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
                    }
                    if (!(FunctionCommon.isFunctionCodeBuiltin((Function.FunctionDetailsOrBuilder)(functionDetails = this.validateUpdateRequestParams(tenant, namespace, sinkName, sinkConfig, componentPackageFile))) || componentPackageFile != null && fileDetail != null)) {
                        throw new IllegalArgumentException(ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType) + " Package is not provided");
                    }
                }
            }
            catch (Exception e) {
                log.error("Invalid register {} request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, sinkName, e});
                throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
            }
            try {
                this.worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
            }
            catch (Exception e) {
                log.error("{} {}/{}/{} cannot be admitted by the runtime factory", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, sinkName});
                throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), sinkName, e.getMessage()));
            }
            Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder().setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0L);
            if (this.worker().getWorkerConfig().isAuthenticationEnabled()) {
                Function.FunctionDetails finalFunctionDetails = functionDetails;
                this.worker().getFunctionRuntimeManager().getRuntimeFactory().getAuthProvider().ifPresent(functionAuthProvider -> {
                    if (authParams.getClientAuthenticationDataSource() != null) {
                        try {
                            Optional functionAuthData = functionAuthProvider.cacheAuthData(finalFunctionDetails, authParams.getClientAuthenticationDataSource());
                            functionAuthData.ifPresent(authData -> functionMetaDataBuilder.setFunctionAuthSpec(Function.FunctionAuthenticationSpec.newBuilder().setData(ByteString.copyFrom((byte[])authData.getData())).build()));
                        }
                        catch (Exception e) {
                            log.error("Error caching authentication data for {} {}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, sinkName, e});
                            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), sinkName, e.getMessage()));
                        }
                    }
                });
            }
            try {
                packageLocationMetaDataBuilder = this.getFunctionPackageLocation(functionMetaDataBuilder.build(), sinkPkgUrl, fileDetail, componentPackageFile);
            }
            catch (Exception e) {
                log.error("Failed process {} {}/{}/{} package: ", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, sinkName, e});
                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
            }
            functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
            String transformFunction = sinkConfig.getTransformFunction();
            if (org.apache.commons.lang3.StringUtils.isNotBlank((CharSequence)transformFunction)) {
                this.setTransformFunctionPackageLocation(functionMetaDataBuilder, functionDetails, transformFunction);
            }
            this.updateRequest(null, functionMetaDataBuilder.build());
        }
        finally {
            if (componentPackageFile != null && componentPackageFile.exists() && (sinkPkgUrl == null || !sinkPkgUrl.startsWith("file"))) {
                componentPackageFile.delete();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void updateSink(String tenant, String namespace, String sinkName, InputStream uploadedInputStream, FormDataContentDisposition fileDetail, String sinkPkgUrl, SinkConfig sinkConfig, AuthenticationParameters authParams, UpdateOptionsImpl updateOptions) {
        SinkConfig mergedConfig;
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        if (tenant == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
        }
        if (namespace == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
        }
        if (sinkName == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Sink name is not provided");
        }
        if (sinkConfig == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Sink config is not provided");
        }
        this.throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, sinkName, "update", authParams);
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, sinkName)) {
            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), sinkName));
        }
        Function.FunctionMetaData existingComponent = functionMetaDataManager.getFunctionMetaData(tenant, namespace, sinkName);
        if (!InstanceUtils.calculateSubjectType((Function.FunctionDetails)existingComponent.getFunctionDetails()).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, sinkName, ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), sinkName));
        }
        SinkConfig existingSinkConfig = SinkConfigUtils.convertFromDetails((Function.FunctionDetails)existingComponent.getFunctionDetails());
        sinkConfig.setTenant(tenant);
        sinkConfig.setNamespace(namespace);
        sinkConfig.setName(sinkName);
        try {
            mergedConfig = SinkConfigUtils.validateUpdate((SinkConfig)existingSinkConfig, (SinkConfig)sinkConfig);
        }
        catch (Exception e) {
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        if (existingSinkConfig.equals((Object)mergedConfig) && org.apache.commons.lang3.StringUtils.isBlank((CharSequence)sinkPkgUrl) && uploadedInputStream == null && (updateOptions == null || !updateOptions.isUpdateAuthData())) {
            log.error("{}/{}/{} Update contains no changes", new Object[]{tenant, namespace, sinkName});
            throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
        }
        File componentPackageFile = null;
        try {
            Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
            Function.FunctionDetails functionDetails;
            try {
                componentPackageFile = this.getPackageFile(this.componentType, sinkPkgUrl, existingComponent.getPackageLocation().getPackagePath(), uploadedInputStream);
                functionDetails = this.validateUpdateRequestParams(tenant, namespace, sinkName, mergedConfig, componentPackageFile);
                if (existingComponent.getPackageLocation().getPackagePath().startsWith("builtin") && !FunctionCommon.isFunctionCodeBuiltin((Function.FunctionDetailsOrBuilder)functionDetails) && (componentPackageFile == null || fileDetail == null)) {
                    throw new IllegalArgumentException(ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType) + " Package is not provided");
                }
            }
            catch (Exception e) {
                log.error("Invalid update {} request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, sinkName, e});
                throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
            }
            try {
                this.worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
            }
            catch (Exception e) {
                log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, sinkName});
                throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), sinkName, e.getMessage()));
            }
            Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder().mergeFrom(existingComponent).setFunctionDetails(functionDetails);
            if (this.worker().getWorkerConfig().isAuthenticationEnabled()) {
                Function.FunctionDetails finalFunctionDetails = functionDetails;
                this.worker().getFunctionRuntimeManager().getRuntimeFactory().getAuthProvider().ifPresent(functionAuthProvider -> {
                    if (authParams.getClientAuthenticationDataSource() != null && updateOptions != null && updateOptions.isUpdateAuthData()) {
                        Optional<Object> existingFunctionAuthData = Optional.empty();
                        if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
                            existingFunctionAuthData = Optional.ofNullable(FunctionAuthUtils.getFunctionAuthData(Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
                        }
                        try {
                            Optional newFunctionAuthData = functionAuthProvider.updateAuthData(finalFunctionDetails, existingFunctionAuthData, authParams.getClientAuthenticationDataSource());
                            if (newFunctionAuthData.isPresent()) {
                                functionMetaDataBuilder.setFunctionAuthSpec(Function.FunctionAuthenticationSpec.newBuilder().setData(ByteString.copyFrom((byte[])((FunctionAuthData)newFunctionAuthData.get()).getData())).build());
                            } else {
                                functionMetaDataBuilder.clearFunctionAuthSpec();
                            }
                        }
                        catch (Exception e) {
                            log.error("Error updating authentication data for {} {}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, sinkName, e});
                            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), sinkName, e.getMessage()));
                        }
                    }
                });
            }
            if (org.apache.commons.lang3.StringUtils.isNotBlank((CharSequence)sinkPkgUrl) || uploadedInputStream != null) {
                Function.FunctionMetaData metaData = functionMetaDataBuilder.build();
                metaData = FunctionMetaDataUtils.incrMetadataVersion((Function.FunctionMetaData)metaData, (Function.FunctionMetaData)metaData);
                try {
                    packageLocationMetaDataBuilder = this.getFunctionPackageLocation(metaData, sinkPkgUrl, fileDetail, componentPackageFile);
                }
                catch (Exception e) {
                    log.error("Failed process {} {}/{}/{} package: ", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, sinkName, e});
                    throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
                }
            } else {
                packageLocationMetaDataBuilder = Function.PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
            }
            functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
            String transformFunction = mergedConfig.getTransformFunction();
            if (org.apache.commons.lang3.StringUtils.isNotBlank((CharSequence)transformFunction) && !transformFunction.equals(existingSinkConfig.getTransformFunction())) {
                this.setTransformFunctionPackageLocation(functionMetaDataBuilder, functionDetails, transformFunction);
            }
            this.updateRequest(existingComponent, functionMetaDataBuilder.build());
        }
        finally {
            if (componentPackageFile != null && componentPackageFile.exists() && (sinkPkgUrl != null && !sinkPkgUrl.startsWith("file") || uploadedInputStream != null)) {
                componentPackageFile.delete();
            }
        }
    }

    private void setTransformFunctionPackageLocation(Function.FunctionMetaData.Builder functionMetaDataBuilder, Function.FunctionDetails functionDetails, String transformFunction) {
        File functionPackageFile = null;
        try {
            String builtin = functionDetails.getBuiltin();
            if (org.apache.commons.lang3.StringUtils.isBlank((CharSequence)builtin)) {
                functionPackageFile = this.getPackageFile(Function.FunctionDetails.ComponentType.FUNCTION, transformFunction);
            }
            Function.PackageLocationMetaData.Builder functionPackageLocation = this.getFunctionPackageLocation(functionMetaDataBuilder.build(), transformFunction, null, functionPackageFile, functionDetails.getName() + "__sink-function", Function.FunctionDetails.ComponentType.FUNCTION, builtin);
            functionMetaDataBuilder.setTransformFunctionPackageLocation(functionPackageLocation);
        }
        catch (Exception e) {
            log.error("Failed process {} {}/{}/{} extra function package: ", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName(), e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        finally {
            if (functionPackageFile != null && functionPackageFile.exists() && !transformFunction.startsWith("file")) {
                functionPackageFile.delete();
            }
        }
    }

    private ExceptionInformation getExceptionInformation(InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry) {
        ExceptionInformation exceptionInformation = new ExceptionInformation();
        exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
        exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
        return exceptionInformation;
    }

    @Override
    public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(String tenant, String namespace, String sinkName, String instanceId, URI uri, AuthenticationParameters authParams) {
        SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData;
        this.componentInstanceStatusRequestValidate(tenant, namespace, sinkName, Integer.parseInt(instanceId), authParams);
        try {
            sinkInstanceStatusData = (SinkStatus.SinkInstanceStatus.SinkInstanceStatusData)new GetSinkStatus().getComponentInstanceStatus(tenant, namespace, sinkName, Integer.parseInt(instanceId), uri);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("{}/{}/{} Got Exception Getting Status", new Object[]{tenant, namespace, sinkName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        return sinkInstanceStatusData;
    }

    @Override
    public SinkStatus getSinkStatus(String tenant, String namespace, String componentName, URI uri, AuthenticationParameters authParams) {
        SinkStatus sinkStatus;
        this.componentStatusRequestValidate(tenant, namespace, componentName, authParams);
        try {
            sinkStatus = (SinkStatus)new GetSinkStatus().getComponentStatus(tenant, namespace, componentName, uri);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("{}/{}/{} Got Exception Getting Status", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        return sinkStatus;
    }

    @Override
    public SinkConfig getSinkInfo(String tenant, String namespace, String componentName, AuthenticationParameters authParams) {
        this.componentStatusRequestValidate(tenant, namespace, componentName, authParams);
        Function.FunctionMetaData functionMetaData = this.worker().getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace, componentName);
        return SinkConfigUtils.convertFromDetails((Function.FunctionDetails)functionMetaData.getFunctionDetails());
    }

    @Override
    public List<ConnectorDefinition> getSinkList() {
        List<ConnectorDefinition> connectorDefinitions = this.getListOfConnectors();
        ArrayList<ConnectorDefinition> retval = new ArrayList<ConnectorDefinition>();
        for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
            if (StringUtils.isEmpty((String)connectorDefinition.getSinkClass())) continue;
            retval.add(connectorDefinition);
        }
        return retval;
    }

    @Override
    public List<ConfigFieldDefinition> getSinkConfigDefinition(String name) {
        List retval;
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        if ((retval = this.worker().getConnectorsManager().getSinkConfigDefinition(name)) == null) {
            throw new RestException(Response.Status.NOT_FOUND, "builtin sink does not exist");
        }
        return retval;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Function.FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String sinkName, SinkConfig sinkConfig, File sinkPackageFile) throws IOException, PulsarAdminException {
        String archive;
        sinkConfig.setTenant(tenant);
        sinkConfig.setNamespace(namespace);
        sinkConfig.setName(sinkName);
        Utils.inferMissingArguments((SinkConfig)sinkConfig);
        ValidatableFunctionPackage connectorFunctionPackage = null;
        if (!org.apache.commons.lang3.StringUtils.isEmpty((CharSequence)sinkConfig.getArchive()) && (archive = sinkConfig.getArchive()).startsWith("builtin")) {
            archive = archive.replaceFirst("^builtin://", "");
            Connector connector = this.worker().getConnectorsManager().getConnector(archive);
            if (connector == null) {
                throw new IllegalArgumentException("Built-in sink is not available");
            }
            connectorFunctionPackage = connector.getConnectorFunctionPackage();
        }
        boolean shouldCloseFunctionPackage = false;
        ValidatableFunctionPackage transformFunctionPackage = null;
        boolean shouldCloseTransformFunctionPackage = false;
        try {
            WorkerConfig workerConfig = this.worker().getWorkerConfig();
            if (connectorFunctionPackage == null && sinkPackageFile != null) {
                connectorFunctionPackage = new FunctionFilePackage(sinkPackageFile, workerConfig.getNarExtractionDirectory(), workerConfig.getEnableClassloadingOfExternalFiles().booleanValue(), ConnectorDefinition.class);
                shouldCloseFunctionPackage = true;
            }
            if (connectorFunctionPackage == null) {
                throw new IllegalArgumentException("Sink package is not provided");
            }
            if (org.apache.commons.lang3.StringUtils.isNotBlank((CharSequence)sinkConfig.getTransformFunction())) {
                transformFunctionPackage = this.getBuiltinFunctionPackage(sinkConfig.getTransformFunction());
                if (transformFunctionPackage == null) {
                    File functionPackageFile = this.getPackageFile(Function.FunctionDetails.ComponentType.FUNCTION, sinkConfig.getTransformFunction());
                    transformFunctionPackage = new FunctionFilePackage(functionPackageFile, workerConfig.getNarExtractionDirectory(), workerConfig.getEnableClassloadingOfExternalFiles().booleanValue(), ConnectorDefinition.class);
                    shouldCloseTransformFunctionPackage = true;
                }
                if (transformFunctionPackage == null) {
                    throw new IllegalArgumentException("Transform Function package not found");
                }
            }
            SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validateAndExtractDetails((SinkConfig)sinkConfig, (ValidatableFunctionPackage)connectorFunctionPackage, (ValidatableFunctionPackage)transformFunctionPackage, (boolean)workerConfig.getValidateConnectorConfig());
            Function.FunctionDetails functionDetails = SinkConfigUtils.convert((SinkConfig)sinkConfig, (SinkConfigUtils.ExtractedSinkDetails)sinkDetails);
            return functionDetails;
        }
        finally {
            if (shouldCloseFunctionPackage && connectorFunctionPackage instanceof AutoCloseable) {
                try {
                    ((AutoCloseable)connectorFunctionPackage).close();
                }
                catch (Exception e) {
                    log.error("Failed to connector function file", (Throwable)e);
                }
            }
            if (shouldCloseTransformFunctionPackage && transformFunctionPackage instanceof AutoCloseable) {
                try {
                    ((AutoCloseable)transformFunctionPackage).close();
                }
                catch (Exception e) {
                    log.error("Failed to close transform function file", (Throwable)e);
                }
            }
        }
    }

    private class GetSinkStatus
    extends ComponentImpl.GetStatus<SinkStatus, SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> {
        private GetSinkStatus() {
        }

        @Override
        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData notScheduledInstance() {
            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
            sinkInstanceStatusData.setRunning(false);
            sinkInstanceStatusData.setError("Sink has not been scheduled");
            return sinkInstanceStatusData;
        }

        @Override
        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData fromFunctionStatusProto(InstanceCommunication.FunctionStatus status, String assignedWorkerId) {
            ExceptionInformation exceptionInformation;
            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
            sinkInstanceStatusData.setRunning(status.getRunning());
            sinkInstanceStatusData.setError(status.getFailureException());
            sinkInstanceStatusData.setNumRestarts(status.getNumRestarts());
            sinkInstanceStatusData.setNumReadFromPulsar(status.getNumReceived());
            sinkInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions() + status.getNumUserExceptions() + status.getNumSourceExceptions());
            LinkedList<ExceptionInformation> systemExceptionInformationList = new LinkedList<ExceptionInformation>();
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
                exceptionInformation = SinksImpl.this.getExceptionInformation(exceptionEntry);
                systemExceptionInformationList.add(exceptionInformation);
            }
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSystemExceptionsList()) {
                exceptionInformation = SinksImpl.this.getExceptionInformation(exceptionEntry);
                systemExceptionInformationList.add(exceptionInformation);
            }
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSourceExceptionsList()) {
                exceptionInformation = SinksImpl.this.getExceptionInformation(exceptionEntry);
                systemExceptionInformationList.add(exceptionInformation);
            }
            sinkInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
            sinkInstanceStatusData.setNumSinkExceptions(status.getNumSinkExceptions());
            LinkedList<ExceptionInformation> sinkExceptionInformationList = new LinkedList<ExceptionInformation>();
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSinkExceptionsList()) {
                ExceptionInformation exceptionInformation2 = SinksImpl.this.getExceptionInformation(exceptionEntry);
                sinkExceptionInformationList.add(exceptionInformation2);
            }
            sinkInstanceStatusData.setLatestSinkExceptions(sinkExceptionInformationList);
            sinkInstanceStatusData.setNumWrittenToSink(status.getNumSuccessfullyProcessed());
            sinkInstanceStatusData.setLastReceivedTime(status.getLastInvocationTime());
            sinkInstanceStatusData.setWorkerId(assignedWorkerId);
            return sinkInstanceStatusData;
        }

        @Override
        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData notRunning(String assignedWorkerId, String error) {
            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
            sinkInstanceStatusData.setRunning(false);
            if (error != null) {
                sinkInstanceStatusData.setError(error);
            }
            sinkInstanceStatusData.setWorkerId(assignedWorkerId);
            return sinkInstanceStatusData;
        }

        @Override
        public SinkStatus getStatus(String tenant, String namespace, String name, Collection<Function.Assignment> assignments, URI uri) throws PulsarAdminException {
            SinkStatus sinkStatus = new SinkStatus();
            for (Function.Assignment assignment : assignments) {
                boolean isOwner = SinksImpl.this.worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = isOwner ? (SinkStatus.SinkInstanceStatus.SinkInstanceStatusData)this.getComponentInstanceStatus(tenant, namespace, name, assignment.getInstance().getInstanceId(), null) : SinksImpl.this.worker().getFunctionAdmin().sinks().getSinkStatus(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId());
                SinkStatus.SinkInstanceStatus instanceStatus = new SinkStatus.SinkInstanceStatus();
                instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
                instanceStatus.setStatus(sinkInstanceStatusData);
                sinkStatus.addInstance(instanceStatus);
            }
            sinkStatus.setNumInstances(sinkStatus.instances.size());
            sinkStatus.getInstances().forEach(sinkInstanceStatus -> {
                if (sinkInstanceStatus.getStatus().isRunning()) {
                    ++sinkStatus.numRunning;
                }
            });
            return sinkStatus;
        }

        @Override
        public SinkStatus getStatusExternal(String tenant, String namespace, String name, int parallelism) {
            SinkStatus sinkStatus = new SinkStatus();
            for (int i = 0; i < parallelism; ++i) {
                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = (SinkStatus.SinkInstanceStatus.SinkInstanceStatusData)this.getComponentInstanceStatus(tenant, namespace, name, i, null);
                SinkStatus.SinkInstanceStatus sinkInstanceStatus2 = new SinkStatus.SinkInstanceStatus();
                sinkInstanceStatus2.setInstanceId(i);
                sinkInstanceStatus2.setStatus(sinkInstanceStatusData);
                sinkStatus.addInstance(sinkInstanceStatus2);
            }
            sinkStatus.setNumInstances(sinkStatus.instances.size());
            sinkStatus.getInstances().forEach(sinkInstanceStatus -> {
                if (sinkInstanceStatus.getStatus().isRunning()) {
                    ++sinkStatus.numRunning;
                }
            });
            return sinkStatus;
        }

        @Override
        public SinkStatus emptyStatus(int parallelism) {
            SinkStatus sinkStatus = new SinkStatus();
            sinkStatus.setNumInstances(parallelism);
            sinkStatus.setNumRunning(0);
            for (int i = 0; i < parallelism; ++i) {
                SinkStatus.SinkInstanceStatus sinkInstanceStatus = new SinkStatus.SinkInstanceStatus();
                sinkInstanceStatus.setInstanceId(i);
                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
                sinkInstanceStatusData.setRunning(false);
                sinkInstanceStatusData.setError("Sink has not been scheduled");
                sinkInstanceStatus.setStatus(sinkInstanceStatusData);
                sinkStatus.addInstance(sinkInstanceStatus);
            }
            return sinkStatus;
        }
    }
}

