/*
 * Decompiled with CFR 0.152.
 */
package org.apache.asterix.metadata.feeds;

import java.util.Map;
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.feeds.FeedId;
import org.apache.asterix.common.feeds.FeedPolicyAccessor;
import org.apache.asterix.common.feeds.IngestionRuntime;
import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
import org.apache.asterix.common.feeds.api.IFeedRuntime;
import org.apache.asterix.common.feeds.api.IFeedSubscriptionManager;
import org.apache.asterix.metadata.entities.PrimaryFeed;
import org.apache.asterix.metadata.feeds.FeedIntakeOperatorNodePushable;
import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
import org.apache.asterix.metadata.functions.ExternalLibraryManager;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;

public class FeedIntakeOperatorDescriptor
extends AbstractSingleActivityOperatorDescriptor {
    private static final long serialVersionUID = 1L;
    private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
    private final FeedId feedId;
    private final FeedPolicyAccessor policyAccessor;
    private IFeedAdapterFactory adaptorFactory;
    private String adaptorLibraryName;
    private String adaptorFactoryClassName;
    private Map<String, String> adaptorConfiguration;
    private ARecordType adapterOutputType;

    public FeedIntakeOperatorDescriptor(JobSpecification spec, PrimaryFeed primaryFeed, IFeedAdapterFactory adapterFactory, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor) {
        super((IOperatorDescriptorRegistry)spec, 0, 1);
        this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
        this.adaptorFactory = adapterFactory;
        this.adapterOutputType = adapterOutputType;
        this.policyAccessor = policyAccessor;
    }

    public FeedIntakeOperatorDescriptor(JobSpecification spec, PrimaryFeed primaryFeed, String adapterLibraryName, String adapterFactoryClassName, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor) {
        super((IOperatorDescriptorRegistry)spec, 0, 1);
        this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
        this.adaptorFactoryClassName = adapterFactoryClassName;
        this.adaptorLibraryName = adapterLibraryName;
        this.adaptorConfiguration = primaryFeed.getAdaptorConfiguration();
        this.adapterOutputType = adapterOutputType;
        this.policyAccessor = policyAccessor;
    }

    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext)ctx.getJobletContext().getApplicationContext().getApplicationObject();
        IFeedSubscriptionManager feedSubscriptionManager = runtimeCtx.getFeedManager().getFeedSubscriptionManager();
        SubscribableFeedRuntimeId feedIngestionId = new SubscribableFeedRuntimeId(this.feedId, IFeedRuntime.FeedRuntimeType.INTAKE, partition);
        IngestionRuntime ingestionRuntime = (IngestionRuntime)feedSubscriptionManager.getSubscribableRuntime(feedIngestionId);
        if (this.adaptorFactory == null) {
            try {
                this.adaptorFactory = this.createExtenralAdapterFactory(ctx, partition);
            }
            catch (Exception exception) {
                throw new HyracksDataException((Throwable)exception);
            }
        }
        return new FeedIntakeOperatorNodePushable(ctx, this.feedId, this.adaptorFactory, partition, ingestionRuntime, this.policyAccessor);
    }

    private IFeedAdapterFactory createExtenralAdapterFactory(IHyracksTaskContext ctx, int partition) throws Exception {
        IFeedAdapterFactory adapterFactory = null;
        ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(this.feedId.getDataverse(), this.adaptorLibraryName);
        if (classLoader == null) {
            String message = "Unable to create adapter as class loader not configured for library " + this.adaptorLibraryName + " in dataverse " + this.feedId.getDataverse();
            LOGGER.severe(message);
            throw new IllegalArgumentException(message);
        }
        adapterFactory = (IFeedAdapterFactory)classLoader.loadClass(this.adaptorFactoryClassName).newInstance();
        adapterFactory.configure(this.adaptorConfiguration, this.adapterOutputType);
        return adapterFactory;
    }

    public FeedId getFeedId() {
        return this.feedId;
    }
}

