/*
 * Decompiled with CFR 0.152.
 */
package org.apache.asterix.metadata.feeds;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.feeds.FeedConnectionId;
import org.apache.asterix.common.feeds.FeedRuntime;
import org.apache.asterix.common.feeds.FeedRuntimeId;
import org.apache.asterix.common.feeds.FeedRuntimeInputHandler;
import org.apache.asterix.common.feeds.api.IFeedManager;
import org.apache.asterix.common.feeds.api.IFeedRuntime;
import org.apache.asterix.metadata.feeds.FeedPolicyEnforcer;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;

public class FeedMetaNodePushable
extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
    private static final Logger LOGGER = Logger.getLogger(FeedMetaNodePushable.class.getName());
    private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
    private FeedPolicyEnforcer policyEnforcer;
    private FeedRuntime feedRuntime;
    private FeedConnectionId connectionId;
    private int partition;
    private int nPartitions;
    private final IFeedRuntime.FeedRuntimeType runtimeType = IFeedRuntime.FeedRuntimeType.OTHER;
    private IFeedManager feedManager;
    private FrameTupleAccessor fta;
    private final IHyracksTaskContext ctx;
    private final String operandId;
    private FeedRuntimeInputHandler inputSideHandler;

    public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId, Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
        this.ctx = ctx;
        this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable)((IActivity)coreOperator).createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
        this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
        this.partition = partition;
        this.nPartitions = nPartitions;
        this.connectionId = feedConnectionId;
        this.feedManager = ((IAsterixAppRuntimeContext)ctx.getJobletContext().getApplicationContext().getApplicationObject()).getFeedManager();
        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext)ctx.getJobletContext().getApplicationContext().getApplicationObject();
        this.feedManager = runtimeCtx.getFeedManager();
        this.operandId = operationId;
    }

    public void open() throws HyracksDataException {
        FeedRuntimeId runtimeId = new FeedRuntimeId(this.runtimeType, this.partition, this.operandId);
        try {
            this.feedRuntime = this.feedManager.getFeedConnectionManager().getFeedRuntime(this.connectionId, runtimeId);
            if (this.feedRuntime == null) {
                this.initializeNewFeedRuntime(runtimeId);
            } else {
                this.reviveOldFeedRuntime(runtimeId);
            }
            this.coreOperator.open();
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new HyracksDataException((Throwable)e);
        }
    }

    private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
        this.fta = new FrameTupleAccessor(this.recordDesc);
        this.inputSideHandler = new FeedRuntimeInputHandler(this.ctx, this.connectionId, runtimeId, (IFrameWriter)this.coreOperator, this.policyEnforcer.getFeedPolicyAccessor(), false, this.fta, this.recordDesc, this.feedManager, this.nPartitions);
        this.setupBasicRuntime(this.inputSideHandler);
    }

    private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
        this.inputSideHandler = this.feedRuntime.getInputHandler();
        this.fta = new FrameTupleAccessor(this.recordDesc);
        this.coreOperator.setOutputFrameWriter(0, this.writer, this.recordDesc);
        this.feedRuntime.setMode(IFeedRuntime.Mode.PROCESS);
        if (LOGGER.isLoggable(Level.INFO)) {
            LOGGER.info("Retreived state from the zombie instance " + this.runtimeType + " node.");
        }
    }

    private void setupBasicRuntime(FeedRuntimeInputHandler inputHandler) throws Exception {
        this.coreOperator.setOutputFrameWriter(0, this.writer, this.recordDesc);
        FeedRuntimeId runtimeId = new FeedRuntimeId(this.runtimeType, this.partition, this.operandId);
        this.feedRuntime = new FeedRuntime(runtimeId, inputHandler, this.writer);
    }

    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
        try {
            this.inputSideHandler.nextFrame(buffer);
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new HyracksDataException((Throwable)e);
        }
    }

    public void fail() throws HyracksDataException {
        if (LOGGER.isLoggable(Level.WARNING)) {
            LOGGER.info("Core Op:" + this.coreOperator.getDisplayName() + " fail ");
        }
        this.feedRuntime.setMode(IFeedRuntime.Mode.FAIL);
        this.coreOperator.fail();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws HyracksDataException {
        try {
            this.coreOperator.close();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (this.inputSideHandler != null) {
                this.inputSideHandler.close();
            }
            if (LOGGER.isLoggable(Level.INFO)) {
                LOGGER.info("Ending Operator  " + this.feedRuntime.getRuntimeId());
            }
        }
    }
}

