/*
 * Decompiled with CFR 0.152.
 */
package org.apache.asterix.metadata.feeds;

import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.feeds.CollectionRuntime;
import org.apache.asterix.common.feeds.FeedCollectRuntimeInputHandler;
import org.apache.asterix.common.feeds.FeedConnectionId;
import org.apache.asterix.common.feeds.FeedFrameCollector;
import org.apache.asterix.common.feeds.FeedId;
import org.apache.asterix.common.feeds.FeedPolicyAccessor;
import org.apache.asterix.common.feeds.FeedRuntime;
import org.apache.asterix.common.feeds.FeedRuntimeId;
import org.apache.asterix.common.feeds.FeedRuntimeInputHandler;
import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
import org.apache.asterix.common.feeds.api.IFeedManager;
import org.apache.asterix.common.feeds.api.IFeedRuntime;
import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
import org.apache.asterix.metadata.feeds.CollectTransformFeedFrameWriter;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;

public class FeedCollectOperatorNodePushable
extends AbstractUnaryOutputSourceOperatorNodePushable {
    private static Logger LOGGER = Logger.getLogger(FeedCollectOperatorNodePushable.class.getName());
    private final int partition;
    private final FeedConnectionId connectionId;
    private final Map<String, String> feedPolicy;
    private final FeedPolicyAccessor policyAccessor;
    private final IFeedManager feedManager;
    private final ISubscribableRuntime sourceRuntime;
    private final IHyracksTaskContext ctx;
    private final int nPartitions;
    private RecordDescriptor outputRecordDescriptor;
    private FeedRuntimeInputHandler inputSideHandler;
    private CollectionRuntime collectRuntime;

    public FeedCollectOperatorNodePushable(IHyracksTaskContext ctx, FeedId sourceFeedId, FeedConnectionId feedConnectionId, Map<String, String> feedPolicy, int partition, int nPartitions, ISubscribableRuntime sourceRuntime) {
        this.ctx = ctx;
        this.partition = partition;
        this.nPartitions = nPartitions;
        this.connectionId = feedConnectionId;
        this.sourceRuntime = sourceRuntime;
        this.feedPolicy = feedPolicy;
        this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext)ctx.getJobletContext().getApplicationContext().getApplicationObject();
        this.feedManager = runtimeCtx.getFeedManager();
    }

    public void initialize() throws HyracksDataException {
        try {
            this.outputRecordDescriptor = this.recordDesc;
            IFeedRuntime.FeedRuntimeType sourceRuntimeType = ((SubscribableFeedRuntimeId)this.sourceRuntime.getRuntimeId()).getFeedRuntimeType();
            switch (sourceRuntimeType) {
                case INTAKE: {
                    this.handleCompleteConnection();
                    break;
                }
                case COMPUTE: {
                    this.handlePartialConnection();
                    break;
                }
                default: {
                    throw new IllegalStateException("Invalid source type " + sourceRuntimeType);
                }
            }
            FeedFrameCollector.State state = this.collectRuntime.waitTillCollectionOver();
            if (state.equals((Object)FeedFrameCollector.State.FINISHED)) {
                this.feedManager.getFeedConnectionManager().deRegisterFeedRuntime(this.connectionId, this.collectRuntime.getRuntimeId());
                this.writer.close();
                this.inputSideHandler.close();
            } else if (state.equals((Object)FeedFrameCollector.State.HANDOVER)) {
                this.inputSideHandler.setMode(IFeedRuntime.Mode.STALL);
                this.writer.close();
                if (LOGGER.isLoggable(Level.INFO)) {
                    LOGGER.info("Ending Collect Operator, the input side handler is now in " + IFeedRuntime.Mode.STALL + " and the output writer " + this.writer + " has been closed ");
                }
            }
        }
        catch (InterruptedException ie) {
            this.handleInterruptedException(ie);
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new HyracksDataException((Throwable)e);
        }
    }

    private void handleCompleteConnection() throws Exception {
        FeedRuntimeId runtimeId = new FeedRuntimeId(IFeedRuntime.FeedRuntimeType.COLLECT, this.partition, "N/A");
        this.collectRuntime = (CollectionRuntime)this.feedManager.getFeedConnectionManager().getFeedRuntime(this.connectionId, runtimeId);
        if (this.collectRuntime == null) {
            this.beginNewFeed(runtimeId);
        } else {
            this.reviveOldFeed();
        }
    }

    private void beginNewFeed(FeedRuntimeId runtimeId) throws Exception {
        this.writer.open();
        Object outputSideWriter = this.writer;
        if (((SubscribableFeedRuntimeId)this.sourceRuntime.getRuntimeId()).getFeedRuntimeType().equals((Object)IFeedRuntime.FeedRuntimeType.COMPUTE)) {
            outputSideWriter = new CollectTransformFeedFrameWriter(this.ctx, this.writer, this.sourceRuntime, this.outputRecordDescriptor, this.connectionId);
            this.recordDesc = this.sourceRuntime.getRecordDescriptor();
        }
        FrameTupleAccessor tupleAccessor = new FrameTupleAccessor(this.recordDesc);
        this.inputSideHandler = new FeedCollectRuntimeInputHandler(this.ctx, this.connectionId, runtimeId, outputSideWriter, this.policyAccessor, false, tupleAccessor, this.recordDesc, this.feedManager, this.nPartitions);
        this.collectRuntime = new CollectionRuntime(this.connectionId, runtimeId, this.inputSideHandler, outputSideWriter, this.sourceRuntime, this.feedPolicy);
        this.feedManager.getFeedConnectionManager().registerFeedRuntime(this.connectionId, (FeedRuntime)this.collectRuntime);
        this.sourceRuntime.subscribeFeed(this.policyAccessor, this.collectRuntime);
    }

    private void reviveOldFeed() throws HyracksDataException {
        this.writer.open();
        this.collectRuntime.getFrameCollector().setState(FeedFrameCollector.State.ACTIVE);
        this.inputSideHandler = this.collectRuntime.getInputHandler();
        IFrameWriter innerWriter = this.inputSideHandler.getCoreOperator();
        if (innerWriter instanceof CollectTransformFeedFrameWriter) {
            ((CollectTransformFeedFrameWriter)innerWriter).reset(this.writer);
        } else {
            this.inputSideHandler.setCoreOperator(this.writer);
        }
        this.inputSideHandler.setMode(IFeedRuntime.Mode.PROCESS);
    }

    private void handlePartialConnection() throws Exception {
        FeedRuntimeId runtimeId = new FeedRuntimeId(IFeedRuntime.FeedRuntimeType.COMPUTE_COLLECT, this.partition, "N/A");
        this.writer.open();
        if (LOGGER.isLoggable(Level.INFO)) {
            LOGGER.info("Beginning new feed (from existing partial connection):" + this.connectionId);
        }
        CollectTransformFeedFrameWriter wrapper = new CollectTransformFeedFrameWriter(this.ctx, this.writer, this.sourceRuntime, this.outputRecordDescriptor, this.connectionId);
        this.inputSideHandler = new FeedRuntimeInputHandler(this.ctx, this.connectionId, runtimeId, (IFrameWriter)wrapper, this.policyAccessor, false, new FrameTupleAccessor(this.recordDesc), this.recordDesc, this.feedManager, this.nPartitions);
        this.collectRuntime = new CollectionRuntime(this.connectionId, runtimeId, this.inputSideHandler, (IFrameWriter)wrapper, this.sourceRuntime, this.feedPolicy);
        this.feedManager.getFeedConnectionManager().registerFeedRuntime(this.connectionId, (FeedRuntime)this.collectRuntime);
        this.recordDesc = this.sourceRuntime.getRecordDescriptor();
        this.sourceRuntime.subscribeFeed(this.policyAccessor, this.collectRuntime);
    }

    private void handleInterruptedException(InterruptedException ie) throws HyracksDataException {
        if (this.policyAccessor.continueOnHardwareFailure()) {
            if (LOGGER.isLoggable(Level.INFO)) {
                LOGGER.info("Continuing on failure as per feed policy, switching to " + IFeedRuntime.Mode.STALL + " until failure is resolved");
            }
        } else {
            if (LOGGER.isLoggable(Level.INFO)) {
                LOGGER.info("Failure during feed ingestion. Deregistering feed runtime " + this.collectRuntime + " as feed is not configured to handle failures");
            }
            this.feedManager.getFeedConnectionManager().deRegisterFeedRuntime(this.connectionId, this.collectRuntime.getRuntimeId());
            this.writer.close();
            throw new HyracksDataException((Throwable)ie);
        }
        this.inputSideHandler.setMode(IFeedRuntime.Mode.STALL);
    }
}

