/*
 * Decompiled with CFR 0.152.
 */
package org.apache.asterix.metadata.feeds;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.feeds.CollectionRuntime;
import org.apache.asterix.common.feeds.DistributeFeedFrameWriter;
import org.apache.asterix.common.feeds.FeedId;
import org.apache.asterix.common.feeds.FeedPolicyAccessor;
import org.apache.asterix.common.feeds.FeedRuntimeId;
import org.apache.asterix.common.feeds.IngestionRuntime;
import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager;
import org.apache.asterix.common.feeds.api.IFeedAdapter;
import org.apache.asterix.common.feeds.api.IFeedManager;
import org.apache.asterix.common.feeds.api.IFeedRuntime;
import org.apache.asterix.common.feeds.api.IFeedSubscriptionManager;
import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
import org.apache.asterix.common.feeds.api.ISubscriberRuntime;
import org.apache.asterix.metadata.feeds.AdapterRuntimeManager;
import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;

public class FeedIntakeOperatorNodePushable
extends AbstractUnaryOutputSourceOperatorNodePushable {
    private static Logger LOGGER = Logger.getLogger(FeedIntakeOperatorNodePushable.class.getName());
    private final FeedId feedId;
    private final int partition;
    private final IFeedSubscriptionManager feedSubscriptionManager;
    private final IFeedManager feedManager;
    private final IHyracksTaskContext ctx;
    private final IFeedAdapterFactory adapterFactory;
    private final FeedPolicyAccessor policyAccessor;
    private IngestionRuntime ingestionRuntime;
    private IFeedAdapter adapter;
    private IIntakeProgressTracker tracker;
    private DistributeFeedFrameWriter feedFrameWriter;

    public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedId feedId, IFeedAdapterFactory adapterFactory, int partition, IngestionRuntime ingestionRuntime, FeedPolicyAccessor policyAccessor) {
        this.ctx = ctx;
        this.feedId = feedId;
        this.partition = partition;
        this.ingestionRuntime = ingestionRuntime;
        this.adapterFactory = adapterFactory;
        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext)ctx.getJobletContext().getApplicationContext().getApplicationObject();
        this.feedSubscriptionManager = runtimeCtx.getFeedManager().getFeedSubscriptionManager();
        this.feedManager = runtimeCtx.getFeedManager();
        this.policyAccessor = policyAccessor;
    }

    public void initialize() throws HyracksDataException {
        block28: {
            AdapterRuntimeManager adapterRuntimeManager = null;
            try {
                if (this.ingestionRuntime == null) {
                    try {
                        this.adapter = (IFeedAdapter)this.adapterFactory.createAdapter(this.ctx, this.partition);
                        if (this.adapterFactory.isRecordTrackingEnabled()) {
                            this.tracker = this.adapterFactory.createIntakeProgressTracker();
                        }
                    }
                    catch (Exception e) {
                        LOGGER.severe("Unable to create adapter : " + this.adapterFactory.getName() + "[" + this.partition + "]" + " Exception " + e);
                        throw new HyracksDataException((Throwable)e);
                    }
                    FrameTupleAccessor fta = new FrameTupleAccessor(this.recordDesc);
                    this.feedFrameWriter = new DistributeFeedFrameWriter(this.ctx, this.feedId, this.writer, IFeedRuntime.FeedRuntimeType.INTAKE, this.partition, fta, this.feedManager);
                    adapterRuntimeManager = new AdapterRuntimeManager(this.feedId, this.adapter, this.tracker, this.feedFrameWriter, this.partition);
                    SubscribableFeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(this.feedId, IFeedRuntime.FeedRuntimeType.INTAKE, this.partition);
                    this.ingestionRuntime = new IngestionRuntime(this.feedId, (FeedRuntimeId)runtimeId, this.feedFrameWriter, this.recordDesc, (IAdapterRuntimeManager)adapterRuntimeManager);
                    this.feedSubscriptionManager.registerFeedSubscribableRuntime((ISubscribableRuntime)this.ingestionRuntime);
                    this.feedFrameWriter.open();
                } else if (this.ingestionRuntime.getAdapterRuntimeManager().getState().equals((Object)IAdapterRuntimeManager.State.INACTIVE_INGESTION)) {
                    this.ingestionRuntime.getAdapterRuntimeManager().setState(IAdapterRuntimeManager.State.ACTIVE_INGESTION);
                    this.adapter = this.ingestionRuntime.getAdapterRuntimeManager().getFeedAdapter();
                    if (LOGGER.isLoggable(Level.INFO)) {
                        LOGGER.info(" Switching to " + IAdapterRuntimeManager.State.ACTIVE_INGESTION + " for ingestion runtime " + this.ingestionRuntime);
                        LOGGER.info(" Adaptor " + this.adapter.getClass().getName() + "[" + this.partition + "]" + " connected to backend for feed " + this.feedId);
                    }
                    this.feedFrameWriter = this.ingestionRuntime.getFeedFrameWriter();
                } else {
                    String message = "Feed Ingestion Runtime for feed " + this.feedId + " is already registered and is active!.";
                    LOGGER.severe(message);
                    throw new IllegalStateException(message);
                }
                this.waitTillIngestionIsOver(adapterRuntimeManager);
                this.feedSubscriptionManager.deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId)this.ingestionRuntime.getRuntimeId());
                if (adapterRuntimeManager.getState().equals((Object)IAdapterRuntimeManager.State.FAILED_INGESTION)) {
                    throw new HyracksDataException("Unable to ingest data");
                }
            }
            catch (InterruptedException ie) {
                List subscribers = this.ingestionRuntime.getSubscribers();
                FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(new HashMap());
                boolean needToHandleFailure = false;
                ArrayList<ISubscriberRuntime> failingSubscribers = new ArrayList<ISubscriberRuntime>();
                for (ISubscriberRuntime subscriber : subscribers) {
                    policyAccessor.reset(subscriber.getFeedPolicy());
                    if (!policyAccessor.continueOnHardwareFailure()) {
                        failingSubscribers.add(subscriber);
                        continue;
                    }
                    needToHandleFailure = true;
                }
                for (ISubscriberRuntime failingSubscriber : failingSubscribers) {
                    try {
                        this.ingestionRuntime.unsubscribeFeed((CollectionRuntime)failingSubscriber);
                    }
                    catch (Exception e) {
                        if (!LOGGER.isLoggable(Level.WARNING)) continue;
                        LOGGER.warning("Excpetion in unsubscribing " + failingSubscriber + " message " + e.getMessage());
                    }
                }
                if (needToHandleFailure) {
                    this.ingestionRuntime.getAdapterRuntimeManager().setState(IAdapterRuntimeManager.State.INACTIVE_INGESTION);
                    if (LOGGER.isLoggable(Level.INFO)) {
                        LOGGER.info("Switching to " + IAdapterRuntimeManager.State.INACTIVE_INGESTION + " on occurrence of failure.");
                    }
                    break block28;
                }
                if (LOGGER.isLoggable(Level.INFO)) {
                    LOGGER.info("Interrupted Exception. None of the subscribers need to handle failures. Shutting down feed ingestion");
                }
                this.feedSubscriptionManager.deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId)this.ingestionRuntime.getRuntimeId());
                throw new HyracksDataException((Throwable)ie);
            }
            catch (Exception e) {
                e.printStackTrace();
                throw new HyracksDataException((Throwable)e);
            }
            finally {
                if (this.ingestionRuntime != null && !this.ingestionRuntime.getAdapterRuntimeManager().getState().equals((Object)IAdapterRuntimeManager.State.INACTIVE_INGESTION)) {
                    this.feedFrameWriter.close();
                    if (LOGGER.isLoggable(Level.INFO)) {
                        LOGGER.info("Closed Frame Writer " + this.feedFrameWriter + " adapter state " + this.ingestionRuntime.getAdapterRuntimeManager().getState());
                    }
                } else if (LOGGER.isLoggable(Level.INFO)) {
                    LOGGER.info("Ending intake operator node pushable in state " + IAdapterRuntimeManager.State.INACTIVE_INGESTION + " Will resume after correcting failure");
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitTillIngestionIsOver(IAdapterRuntimeManager adapterRuntimeManager) throws InterruptedException {
        if (LOGGER.isLoggable(Level.INFO)) {
            LOGGER.info("Waiting for adaptor [" + this.partition + "]" + "to be done with ingestion of feed " + this.feedId);
        }
        IAdapterRuntimeManager iAdapterRuntimeManager = adapterRuntimeManager;
        synchronized (iAdapterRuntimeManager) {
            while (!adapterRuntimeManager.getState().equals((Object)IAdapterRuntimeManager.State.FINISHED_INGESTION) && !adapterRuntimeManager.getState().equals((Object)IAdapterRuntimeManager.State.FAILED_INGESTION)) {
                adapterRuntimeManager.wait();
            }
        }
        if (LOGGER.isLoggable(Level.INFO)) {
            LOGGER.info(" Adaptor " + this.adapter.getClass().getName() + "[" + this.partition + "]" + " done with ingestion of feed " + this.feedId);
        }
    }
}

