/*
 * Decompiled with CFR 0.152.
 */
package org.apache.asterix.metadata.feeds;

import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.feeds.FeedConnectionId;
import org.apache.asterix.common.feeds.FeedId;
import org.apache.asterix.common.feeds.IngestionRuntime;
import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
import org.apache.asterix.common.feeds.api.IFeedLifecycleListener;
import org.apache.asterix.common.feeds.api.IFeedRuntime;
import org.apache.asterix.common.feeds.api.IFeedSubscriptionManager;
import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
import org.apache.asterix.metadata.feeds.FeedCollectOperatorNodePushable;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;

public class FeedCollectOperatorDescriptor
extends AbstractSingleActivityOperatorDescriptor {
    private static final long serialVersionUID = 1L;
    private static final Logger LOGGER = Logger.getLogger(FeedCollectOperatorDescriptor.class.getName());
    private final IAType outputType;
    private final FeedConnectionId connectionId;
    private final Map<String, String> feedPolicyProperties;
    private IFeedSubscriptionManager subscriptionManager;
    private final FeedId sourceFeedId;
    private final IFeedLifecycleListener.ConnectionLocation subscriptionLocation;

    public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId, FeedId sourceFeedId, ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties, IFeedLifecycleListener.ConnectionLocation subscriptionLocation) {
        super((IOperatorDescriptorRegistry)spec, 0, 1);
        this.recordDescriptors[0] = rDesc;
        this.outputType = atype;
        this.connectionId = feedConnectionId;
        this.feedPolicyProperties = feedPolicyProperties;
        this.sourceFeedId = sourceFeedId;
        this.subscriptionLocation = subscriptionLocation;
    }

    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext)ctx.getJobletContext().getApplicationContext().getApplicationObject();
        this.subscriptionManager = runtimeCtx.getFeedManager().getFeedSubscriptionManager();
        IngestionRuntime sourceRuntime = null;
        FeedCollectOperatorNodePushable nodePushable = null;
        switch (this.subscriptionLocation) {
            case SOURCE_FEED_INTAKE_STAGE: {
                try {
                    SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(this.sourceFeedId, IFeedRuntime.FeedRuntimeType.INTAKE, partition);
                    sourceRuntime = this.getIntakeRuntime(feedSubscribableRuntimeId);
                    if (sourceRuntime == null) {
                        throw new HyracksDataException("Source intake task not found for source feed id " + this.sourceFeedId);
                    }
                    nodePushable = new FeedCollectOperatorNodePushable(ctx, this.sourceFeedId, this.connectionId, this.feedPolicyProperties, partition, nPartitions, (ISubscribableRuntime)sourceRuntime);
                    break;
                }
                catch (Exception exception) {
                    if (LOGGER.isLoggable(Level.SEVERE)) {
                        LOGGER.severe("Initialization of the feed adaptor failed with exception " + exception);
                    }
                    throw new HyracksDataException("Initialization of the feed adapter failed", (Throwable)exception);
                }
            }
            case SOURCE_FEED_COMPUTE_STAGE: {
                SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(this.sourceFeedId, IFeedRuntime.FeedRuntimeType.COMPUTE, partition);
                sourceRuntime = this.subscriptionManager.getSubscribableRuntime(feedSubscribableRuntimeId);
                if (sourceRuntime == null) {
                    throw new HyracksDataException("Source compute task not found for source feed id " + this.sourceFeedId + " " + IFeedRuntime.FeedRuntimeType.COMPUTE + "[" + partition + "]");
                }
                nodePushable = new FeedCollectOperatorNodePushable(ctx, this.sourceFeedId, this.connectionId, this.feedPolicyProperties, partition, nPartitions, (ISubscribableRuntime)sourceRuntime);
            }
        }
        return nodePushable;
    }

    public FeedConnectionId getFeedConnectionId() {
        return this.connectionId;
    }

    public Map<String, String> getFeedPolicyProperties() {
        return this.feedPolicyProperties;
    }

    public IAType getOutputType() {
        return this.outputType;
    }

    public RecordDescriptor getRecordDescriptor() {
        return this.recordDescriptors[0];
    }

    public FeedId getSourceFeedId() {
        return this.sourceFeedId;
    }

    private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId) {
        int waitCycleCount = 0;
        ISubscribableRuntime ingestionRuntime = this.subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
        while (ingestionRuntime == null && waitCycleCount < 10) {
            block3: {
                try {
                    Thread.sleep(2000L);
                    ++waitCycleCount;
                    if (!LOGGER.isLoggable(Level.INFO)) break block3;
                    LOGGER.info("waiting to obtain ingestion runtime for subscription " + subscribableRuntimeId);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
            ingestionRuntime = this.subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
        }
        return (IngestionRuntime)ingestionRuntime;
    }

    public IFeedLifecycleListener.ConnectionLocation getSubscriptionLocation() {
        return this.subscriptionLocation;
    }
}

