/*
 * Decompiled with CFR 0.152.
 */
package org.apache.asterix.metadata.feeds;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.feeds.FeedConnectionId;
import org.apache.asterix.common.feeds.FeedPolicyAccessor;
import org.apache.asterix.common.feeds.api.IFeedRuntime;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.DatasourceAdapter;
import org.apache.asterix.metadata.entities.Datatype;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.FeedPolicy;
import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.metadata.entities.PrimaryFeed;
import org.apache.asterix.metadata.entities.SecondaryFeed;
import org.apache.asterix.metadata.feeds.FeedCollectOperatorDescriptor;
import org.apache.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
import org.apache.asterix.metadata.functions.ExternalLibraryManager;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.common.utils.Triple;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
import org.apache.hyracks.api.constraints.Constraint;
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
import org.apache.hyracks.api.constraints.expressions.ConstraintExpression;
import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.common.data.partition.RandomPartitionComputerFactory;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;

public class FeedUtil {
    private static Logger LOGGER = Logger.getLogger(FeedUtil.class.getName());

    public static String getFeedPointKeyRep(Feed feed, List<String> appliedFunctions) {
        StringBuilder builder = new StringBuilder();
        builder.append(feed.getDataverseName() + ":");
        builder.append(feed.getFeedName() + ":");
        if (appliedFunctions != null && !appliedFunctions.isEmpty()) {
            for (String function : appliedFunctions) {
                builder.append(function + ":");
            }
            builder.deleteCharAt(builder.length() - 1);
        }
        return builder.toString();
    }

    public static Dataset validateIfDatasetExists(String dataverse, String datasetName, MetadataTransactionContext ctx) throws AsterixException {
        Dataset dataset = MetadataManager.INSTANCE.getDataset(ctx, dataverse, datasetName);
        if (dataset == null) {
            throw new AsterixException("Unknown target dataset :" + datasetName);
        }
        if (!dataset.getDatasetType().equals((Object)DatasetConfig.DatasetType.INTERNAL)) {
            throw new AsterixException("Statement not applicable. Dataset " + datasetName + " is not of required type " + DatasetConfig.DatasetType.INTERNAL);
        }
        return dataset;
    }

    public static Feed validateIfFeedExists(String dataverse, String feedName, MetadataTransactionContext ctx) throws MetadataException, AsterixException {
        Feed feed = MetadataManager.INSTANCE.getFeed(ctx, dataverse, feedName);
        if (feed == null) {
            throw new AsterixException("Unknown source feed: " + feedName);
        }
        return feed;
    }

    public static FeedPolicy validateIfPolicyExists(String dataverse, String policyName, MetadataTransactionContext ctx) throws AsterixException {
        FeedPolicy feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, dataverse, policyName);
        if (feedPolicy == null && (feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, "Metadata", policyName)) == null) {
            throw new AsterixException("Unknown feed policy" + policyName);
        }
        return feedPolicy;
    }

    public static JobSpecification alterJobSpecificationForFeed(JobSpecification spec, FeedConnectionId feedConnectionId, Map<String, String> feedPolicyProperties) {
        IConnectorDescriptor connDesc;
        if (LOGGER.isLoggable(Level.INFO)) {
            LOGGER.info("Original Job Spec:" + spec);
        }
        JobSpecification altered = new JobSpecification(spec.getFrameSize());
        Map operatorMap = spec.getOperatorMap();
        boolean preProcessingRequired = FeedUtil.preProcessingRequired(feedConnectionId);
        String operandId = null;
        HashMap<OperatorDescriptorId, OperatorDescriptorId> oldNewOID = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
        FeedMetaOperatorDescriptor metaOp = null;
        for (Map.Entry entry : operatorMap.entrySet()) {
            boolean bl;
            operandId = "N/A";
            IOperatorDescriptor opDesc = (IOperatorDescriptor)entry.getValue();
            if (opDesc instanceof FeedCollectOperatorDescriptor) {
                FeedCollectOperatorDescriptor orig = (FeedCollectOperatorDescriptor)opDesc;
                FeedCollectOperatorDescriptor feedCollectOperatorDescriptor = new FeedCollectOperatorDescriptor(altered, orig.getFeedConnectionId(), orig.getSourceFeedId(), (ARecordType)orig.getOutputType(), orig.getRecordDescriptor(), orig.getFeedPolicyProperties(), orig.getSubscriptionLocation());
                oldNewOID.put(opDesc.getOperatorId(), feedCollectOperatorDescriptor.getOperatorId());
                continue;
            }
            if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor) {
                operandId = ((AsterixLSMTreeInsertDeleteOperatorDescriptor)opDesc).getIndexName();
                metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc, feedPolicyProperties, IFeedRuntime.FeedRuntimeType.STORE, false, operandId);
                oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
                continue;
            }
            if (opDesc instanceof AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor) {
                operandId = ((AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor)opDesc).getIndexName();
                metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc, feedPolicyProperties, IFeedRuntime.FeedRuntimeType.STORE, false, operandId);
                oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
                continue;
            }
            IFeedRuntime.FeedRuntimeType runtimeType = null;
            boolean bl2 = false;
            boolean createMetaOp = true;
            OperatorDescriptorId opId = null;
            if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
                IPushRuntimeFactory runtimeFactory = ((AlgebricksMetaOperatorDescriptor)opDesc).getPipeline().getRuntimeFactories()[0];
                if (runtimeFactory instanceof AssignRuntimeFactory) {
                    IConnectorDescriptor connectorDesc = (IConnectorDescriptor)((List)spec.getOperatorInputMap().get(opDesc.getOperatorId())).get(0);
                    IOperatorDescriptor sourceOp = spec.getProducer(connectorDesc);
                    if (sourceOp instanceof FeedCollectOperatorDescriptor) {
                        runtimeType = preProcessingRequired ? IFeedRuntime.FeedRuntimeType.COMPUTE : IFeedRuntime.FeedRuntimeType.OTHER;
                        bl = preProcessingRequired;
                    } else {
                        runtimeType = IFeedRuntime.FeedRuntimeType.OTHER;
                    }
                } else {
                    runtimeType = runtimeFactory instanceof EmptyTupleSourceRuntimeFactory ? IFeedRuntime.FeedRuntimeType.ETS : IFeedRuntime.FeedRuntimeType.OTHER;
                }
            } else if (opDesc instanceof AbstractSingleActivityOperatorDescriptor) {
                runtimeType = IFeedRuntime.FeedRuntimeType.OTHER;
            } else {
                opId = altered.createOperatorDescriptorId(opDesc);
                createMetaOp = false;
            }
            if (createMetaOp) {
                metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc, feedPolicyProperties, runtimeType, bl, operandId);
                opId = metaOp.getOperatorId();
            }
            oldNewOID.put(opDesc.getOperatorId(), opId);
        }
        HashMap connectorMapping = new HashMap();
        for (Map.Entry entry : spec.getConnectorMap().entrySet()) {
            connDesc = (IConnectorDescriptor)entry.getValue();
            ConnectorDescriptorId connectorDescriptorId = altered.createConnectorDescriptor(connDesc);
            connectorMapping.put(entry.getKey(), connectorDescriptorId);
        }
        for (Map.Entry entry : spec.getConnectorOperatorMap().entrySet()) {
            connDesc = (IConnectorDescriptor)altered.getConnectorMap().get(connectorMapping.get(entry.getKey()));
            Pair pair = (Pair)((Pair)entry.getValue()).getLeft();
            Pair rightOp = (Pair)((Pair)entry.getValue()).getRight();
            IOperatorDescriptor leftOpDesc = (IOperatorDescriptor)altered.getOperatorMap().get(oldNewOID.get(((IOperatorDescriptor)pair.getLeft()).getOperatorId()));
            IOperatorDescriptor rightOpDesc = (IOperatorDescriptor)altered.getOperatorMap().get(oldNewOID.get(((IOperatorDescriptor)rightOp.getLeft()).getOperatorId()));
            altered.connect(connDesc, leftOpDesc, ((Integer)pair.getRight()).intValue(), rightOpDesc, ((Integer)rightOp.getRight()).intValue());
        }
        HashMap operatorLocations = new HashMap();
        HashMap<OperatorDescriptorId, Integer> operatorCounts = new HashMap<OperatorDescriptorId, Integer>();
        for (Constraint constraint : spec.getUserConstraints()) {
            LValueConstraintExpression lexpr = constraint.getLValue();
            ConstraintExpression cexpr = constraint.getRValue();
            switch (lexpr.getTag()) {
                case PARTITION_COUNT: {
                    OperatorDescriptorId opId = ((PartitionCountExpression)lexpr).getOperatorDescriptorId();
                    operatorCounts.put(opId, (Integer)((ConstantExpression)cexpr).getValue());
                    break;
                }
                case PARTITION_LOCATION: {
                    OperatorDescriptorId opId = ((PartitionLocationExpression)lexpr).getOperatorDescriptorId();
                    IOperatorDescriptor opDesc = (IOperatorDescriptor)altered.getOperatorMap().get(oldNewOID.get(opId));
                    ArrayList<LocationConstraint> locations = (ArrayList<LocationConstraint>)operatorLocations.get(opDesc.getOperatorId());
                    if (locations == null) {
                        locations = new ArrayList<LocationConstraint>();
                        operatorLocations.put(opDesc.getOperatorId(), locations);
                    }
                    String location = (String)((ConstantExpression)cexpr).getValue();
                    LocationConstraint lc = new LocationConstraint();
                    lc.location = location;
                    lc.partition = ((PartitionLocationExpression)lexpr).getPartition();
                    locations.add(lc);
                }
            }
        }
        for (Map.Entry entry : operatorLocations.entrySet()) {
            IOperatorDescriptor opDesc = (IOperatorDescriptor)altered.getOperatorMap().get(oldNewOID.get(entry.getKey()));
            Collections.sort((List)entry.getValue(), new Comparator<LocationConstraint>(){

                @Override
                public int compare(LocationConstraint o1, LocationConstraint o2) {
                    return o1.partition - o2.partition;
                }
            });
            String[] locations = new String[((List)entry.getValue()).size()];
            for (int i = 0; i < locations.length; ++i) {
                locations[i] = ((LocationConstraint)((List)entry.getValue()).get((int)i)).location;
            }
            PartitionConstraintHelper.addAbsoluteLocationConstraint((JobSpecification)altered, (IOperatorDescriptor)opDesc, (String[])locations);
        }
        for (Map.Entry entry : operatorCounts.entrySet()) {
            IOperatorDescriptor opDesc = (IOperatorDescriptor)altered.getOperatorMap().get(oldNewOID.get(entry.getKey()));
            if (operatorLocations.keySet().contains(entry.getKey())) continue;
            PartitionConstraintHelper.addPartitionCountConstraint((JobSpecification)altered, (IOperatorDescriptor)opDesc, (int)((Integer)entry.getValue()));
        }
        altered.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
        altered.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
        for (OperatorDescriptorId operatorDescriptorId : spec.getRoots()) {
            altered.addRoot((IOperatorDescriptor)altered.getOperatorMap().get(oldNewOID.get(operatorDescriptorId)));
        }
        altered.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
        if (LOGGER.isLoggable(Level.INFO)) {
            LOGGER.info("New Job Spec:" + altered);
        }
        return altered;
    }

    public static void increaseCardinality(JobSpecification spec, IFeedRuntime.FeedRuntimeType compute, int requiredCardinality, List<String> newLocations) throws AsterixException {
        IOperatorDescriptor changingOpDesc = FeedUtil.alterJobSpecForComputeCardinality(spec, requiredCardinality);
        PartitionConstraintHelper.addAbsoluteLocationConstraint((JobSpecification)spec, (IOperatorDescriptor)changingOpDesc, (String[])FeedUtil.nChooseK(requiredCardinality, newLocations));
    }

    public static void decreaseComputeCardinality(JobSpecification spec, IFeedRuntime.FeedRuntimeType compute, int requiredCardinality, List<String> currentLocations) throws AsterixException {
        IOperatorDescriptor changingOpDesc = FeedUtil.alterJobSpecForComputeCardinality(spec, requiredCardinality);
        String[] chosenLocations = FeedUtil.nChooseK(requiredCardinality, currentLocations);
        PartitionConstraintHelper.addAbsoluteLocationConstraint((JobSpecification)spec, (IOperatorDescriptor)changingOpDesc, (String[])chosenLocations);
    }

    private static IOperatorDescriptor alterJobSpecForComputeCardinality(JobSpecification spec, int requiredCardinality) throws AsterixException {
        Map operatorInputMap;
        boolean removed;
        Map connectors = spec.getConnectorMap();
        Map connectorOpMap = spec.getConnectorOperatorMap();
        IOperatorDescriptor sourceOp = null;
        IOperatorDescriptor targetOp = null;
        IConnectorDescriptor connDesc = null;
        for (Map.Entry entry : connectorOpMap.entrySet()) {
            ConnectorDescriptorId cid = (ConnectorDescriptorId)entry.getKey();
            sourceOp = (IOperatorDescriptor)((Pair)((Pair)entry.getValue()).getKey()).getKey();
            if (!(sourceOp instanceof FeedCollectOperatorDescriptor)) continue;
            targetOp = (IOperatorDescriptor)((Pair)((Pair)entry.getValue()).getValue()).getKey();
            if (targetOp instanceof FeedMetaOperatorDescriptor && ((FeedMetaOperatorDescriptor)targetOp).getRuntimeType().equals((Object)IFeedRuntime.FeedRuntimeType.COMPUTE)) {
                connDesc = (IConnectorDescriptor)connectors.get(cid);
                break;
            }
            throw new AsterixException("Incorrect manipulation, feed does not have a compute stage");
        }
        if (!(removed = ((List)(operatorInputMap = spec.getOperatorInputMap()).get(targetOp.getOperatorId())).remove(connDesc))) {
            throw new AsterixException("Connector desc not found");
        }
        Map operatorOutputMap = spec.getOperatorOutputMap();
        removed = ((List)operatorOutputMap.get(sourceOp.getOperatorId())).remove(connDesc);
        if (!removed) {
            throw new AsterixException("Connector desc not found");
        }
        spec.getConnectorMap().remove(connDesc.getConnectorId());
        connectorOpMap.remove(connDesc.getConnectorId());
        RandomPartitionComputerFactory tpcf = new RandomPartitionComputerFactory(requiredCardinality);
        MToNPartitioningConnectorDescriptor newConnector = new MToNPartitioningConnectorDescriptor((IConnectorDescriptorRegistry)spec, (ITuplePartitionComputerFactory)tpcf);
        spec.getConnectorMap().put(newConnector.getConnectorId(), newConnector);
        spec.connect((IConnectorDescriptor)newConnector, sourceOp, 0, targetOp, 0);
        Set userConstraints = spec.getUserConstraints();
        Constraint countConstraint = null;
        Constraint locationConstraint = null;
        ArrayList<LocationConstraint> locations = new ArrayList<LocationConstraint>();
        IOperatorDescriptor changingOpDesc = null;
        block5: for (Constraint constraint : userConstraints) {
            LValueConstraintExpression lexpr = constraint.getLValue();
            ConstraintExpression cexpr = constraint.getRValue();
            switch (lexpr.getTag()) {
                case PARTITION_COUNT: {
                    OperatorDescriptorId opId = ((PartitionCountExpression)lexpr).getOperatorDescriptorId();
                    IOperatorDescriptor opDesc = (IOperatorDescriptor)spec.getOperatorMap().get(opId);
                    if (!(opDesc instanceof FeedMetaOperatorDescriptor)) break;
                    IFeedRuntime.FeedRuntimeType runtimeType = ((FeedMetaOperatorDescriptor)opDesc).getRuntimeType();
                    if (!runtimeType.equals((Object)IFeedRuntime.FeedRuntimeType.COMPUTE)) continue block5;
                    countConstraint = constraint;
                    changingOpDesc = opDesc;
                    break;
                }
                case PARTITION_LOCATION: {
                    IFeedRuntime.FeedRuntimeType runtimeType;
                    OperatorDescriptorId opId = ((PartitionLocationExpression)lexpr).getOperatorDescriptorId();
                    IOperatorDescriptor opDesc = (IOperatorDescriptor)spec.getOperatorMap().get(opId);
                    if (!(opDesc instanceof FeedMetaOperatorDescriptor) || !(runtimeType = ((FeedMetaOperatorDescriptor)opDesc).getRuntimeType()).equals((Object)IFeedRuntime.FeedRuntimeType.COMPUTE)) break;
                    locationConstraint = constraint;
                    changingOpDesc = opDesc;
                    String location = (String)((ConstantExpression)cexpr).getValue();
                    LocationConstraint lc = new LocationConstraint();
                    lc.location = location;
                    lc.partition = ((PartitionLocationExpression)lexpr).getPartition();
                    locations.add(lc);
                }
            }
        }
        userConstraints.remove(countConstraint);
        if (locationConstraint != null) {
            userConstraints.remove(locationConstraint);
        }
        return changingOpDesc;
    }

    private static String[] nChooseK(int k, List<String> locations) {
        String[] result = new String[k];
        for (int i = 0; i < k; ++i) {
            result[i] = locations.get(i);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static boolean preProcessingRequired(FeedConnectionId connectionId) {
        boolean preProcessingRequired;
        block7: {
            MetadataTransactionContext ctx = null;
            Feed feed = null;
            preProcessingRequired = false;
            try {
                MetadataManager.INSTANCE.acquireReadLatch();
                ctx = MetadataManager.INSTANCE.beginTransaction();
                feed = MetadataManager.INSTANCE.getFeed(ctx, connectionId.getFeedId().getDataverse(), connectionId.getFeedId().getFeedName());
                preProcessingRequired = feed.getAppliedFunction() != null;
                MetadataManager.INSTANCE.commitTransaction(ctx);
            }
            catch (Exception e) {
                if (ctx == null) break block7;
                try {
                    MetadataManager.INSTANCE.abortTransaction(ctx);
                }
                catch (Exception abortException) {
                    e.addSuppressed(abortException);
                    throw new IllegalStateException(e);
                }
            }
            finally {
                MetadataManager.INSTANCE.releaseReadLatch();
            }
        }
        return preProcessingRequired;
    }

    public static Triple<IFeedAdapterFactory, ARecordType, DatasourceAdapter.AdapterType> getPrimaryFeedFactoryAndOutput(PrimaryFeed feed, FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
        String adapterName = null;
        DatasourceAdapter adapterEntity = null;
        String adapterFactoryClassname = null;
        IFeedAdapterFactory adapterFactory = null;
        ARecordType adapterOutputType = null;
        Triple feedProps = null;
        DatasourceAdapter.AdapterType adapterType = null;
        try {
            adapterName = feed.getAdaptorName();
            adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, "Metadata", adapterName);
            if (adapterEntity == null) {
                adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, feed.getDataverseName(), adapterName);
            }
            if (adapterEntity != null) {
                adapterType = adapterEntity.getType();
                adapterFactoryClassname = adapterEntity.getClassname();
                switch (adapterType) {
                    case INTERNAL: {
                        adapterFactory = (IFeedAdapterFactory)Class.forName(adapterFactoryClassname).newInstance();
                        break;
                    }
                    case EXTERNAL: {
                        String[] anameComponents = adapterName.split("#");
                        String libraryName = anameComponents[0];
                        ClassLoader cl = ExternalLibraryManager.getLibraryClassLoader(feed.getDataverseName(), libraryName);
                        adapterFactory = (IFeedAdapterFactory)cl.loadClass(adapterFactoryClassname).newInstance();
                    }
                }
            } else {
                adapterFactoryClassname = AqlMetadataProvider.adapterFactoryMapping.get(adapterName);
                if (adapterFactoryClassname == null) {
                    adapterFactoryClassname = adapterName;
                }
                adapterFactory = (IFeedAdapterFactory)Class.forName(adapterFactoryClassname).newInstance();
                adapterType = DatasourceAdapter.AdapterType.INTERNAL;
            }
            Map<String, String> configuration = feed.getAdaptorConfiguration();
            configuration.putAll(policyAccessor.getFeedPolicy());
            adapterOutputType = FeedUtil.getOutputType(feed, configuration);
            adapterFactory.configure(configuration, adapterOutputType);
            feedProps = new Triple((Object)adapterFactory, (Object)adapterOutputType, (Object)adapterType);
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new AlgebricksException("unable to create adapter " + e);
        }
        return feedProps;
    }

    private static ARecordType getOutputType(PrimaryFeed feed, Map<String, String> configuration) throws Exception {
        String dataverseName;
        String datatypeName;
        ARecordType outputType = null;
        String fqOutputType = configuration.get("type-name");
        if (fqOutputType == null) {
            throw new IllegalArgumentException("No output type specified");
        }
        String[] dataverseAndType = fqOutputType.split("[.]");
        if (dataverseAndType.length == 1) {
            datatypeName = dataverseAndType[0];
            dataverseName = feed.getDataverseName();
        } else if (dataverseAndType.length == 2) {
            dataverseName = dataverseAndType[0];
            datatypeName = dataverseAndType[1];
        } else {
            throw new IllegalArgumentException("Invalid value for the parameter type-name");
        }
        MetadataTransactionContext ctx = null;
        MetadataManager.INSTANCE.acquireReadLatch();
        try {
            ctx = MetadataManager.INSTANCE.beginTransaction();
            Datatype t = MetadataManager.INSTANCE.getDatatype(ctx, dataverseName, datatypeName);
            IAType type = t.getDatatype();
            if (type.getTypeTag() != ATypeTag.RECORD) {
                throw new IllegalStateException();
            }
            outputType = (ARecordType)t.getDatatype();
            MetadataManager.INSTANCE.commitTransaction(ctx);
        }
        catch (Exception e) {
            if (ctx != null) {
                MetadataManager.INSTANCE.abortTransaction(ctx);
            }
            throw e;
        }
        finally {
            MetadataManager.INSTANCE.releaseReadLatch();
        }
        return outputType;
    }

    public static String getSecondaryFeedOutput(SecondaryFeed feed, FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx) throws AlgebricksException, MetadataException {
        String outputType = null;
        String primaryFeedName = feed.getSourceFeedName();
        Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feed.getDataverseName(), primaryFeedName);
        FunctionSignature appliedFunction = primaryFeed.getAppliedFunction();
        if (appliedFunction == null) {
            Triple<IFeedAdapterFactory, ARecordType, DatasourceAdapter.AdapterType> result = FeedUtil.getPrimaryFeedFactoryAndOutput((PrimaryFeed)primaryFeed, policyAccessor, mdTxnCtx);
            outputType = ((ARecordType)result.second).getTypeName();
        } else {
            Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
            if (function != null) {
                if (function.getLanguage().equals("AQL")) {
                    throw new NotImplementedException("Secondary feeds derived from a source feed that has an applied AQL function are not supported yet.");
                }
                outputType = function.getReturnType();
            } else {
                throw new IllegalArgumentException("Function " + appliedFunction + " associated with source feed not found in Metadata.");
            }
        }
        return outputType;
    }

    private static class LocationConstraint {
        int partition;
        String location;

        private LocationConstraint() {
        }
    }
}

