/*
 * Decompiled with CFR 0.152.
 */
package org.apache.asterix.metadata.feeds;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.feeds.DistributeFeedFrameWriter;
import org.apache.asterix.common.feeds.FeedConnectionId;
import org.apache.asterix.common.feeds.FeedRuntime;
import org.apache.asterix.common.feeds.FeedRuntimeId;
import org.apache.asterix.common.feeds.FeedRuntimeInputHandler;
import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
import org.apache.asterix.common.feeds.SubscribableRuntime;
import org.apache.asterix.common.feeds.api.IFeedManager;
import org.apache.asterix.common.feeds.api.IFeedRuntime;
import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
import org.apache.asterix.metadata.feeds.FeedPolicyEnforcer;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;

public class FeedMetaComputeNodePushable
extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
    private static final Logger LOGGER = Logger.getLogger(FeedMetaComputeNodePushable.class.getName());
    private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
    private FeedPolicyEnforcer policyEnforcer;
    private FeedRuntime feedRuntime;
    private FeedConnectionId connectionId;
    private int partition;
    private int nPartitions;
    private IFeedManager feedManager;
    private FrameTupleAccessor fta;
    private final IHyracksTaskContext ctx;
    private final IFeedRuntime.FeedRuntimeType runtimeType = IFeedRuntime.FeedRuntimeType.COMPUTE;
    private FeedRuntimeInputHandler inputSideHandler;

    public FeedMetaComputeNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId, Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
        this.ctx = ctx;
        this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable)((IActivity)coreOperator).createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
        this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
        this.partition = partition;
        this.nPartitions = nPartitions;
        this.connectionId = feedConnectionId;
        this.feedManager = ((IAsterixAppRuntimeContext)ctx.getJobletContext().getApplicationContext().getApplicationObject()).getFeedManager();
        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext)ctx.getJobletContext().getApplicationContext().getApplicationObject();
        this.feedManager = runtimeCtx.getFeedManager();
    }

    public void open() throws HyracksDataException {
        SubscribableFeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(this.connectionId.getFeedId(), this.runtimeType, this.partition);
        try {
            this.feedRuntime = this.feedManager.getFeedConnectionManager().getFeedRuntime(this.connectionId, (FeedRuntimeId)runtimeId);
            if (this.feedRuntime == null) {
                this.initializeNewFeedRuntime((FeedRuntimeId)runtimeId);
            } else {
                this.reviveOldFeedRuntime((FeedRuntimeId)runtimeId);
            }
            this.writer.open();
            this.coreOperator.open();
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new HyracksDataException((Throwable)e);
        }
    }

    private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
        this.fta = new FrameTupleAccessor(this.recordDesc);
        this.inputSideHandler = new FeedRuntimeInputHandler(this.ctx, this.connectionId, runtimeId, (IFrameWriter)this.coreOperator, this.policyEnforcer.getFeedPolicyAccessor(), true, this.fta, this.recordDesc, this.feedManager, this.nPartitions);
        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(this.ctx, this.connectionId.getFeedId(), this.writer, this.runtimeType, this.partition, new FrameTupleAccessor(this.recordDesc), this.feedManager);
        this.coreOperator.setOutputFrameWriter(0, (IFrameWriter)distributeWriter, this.recordDesc);
        this.feedRuntime = new SubscribableRuntime(this.connectionId.getFeedId(), runtimeId, this.inputSideHandler, distributeWriter, this.recordDesc);
        this.feedManager.getFeedSubscriptionManager().registerFeedSubscribableRuntime((ISubscribableRuntime)this.feedRuntime);
        this.feedManager.getFeedConnectionManager().registerFeedRuntime(this.connectionId, this.feedRuntime);
        distributeWriter.subscribeFeed(this.policyEnforcer.getFeedPolicyAccessor(), this.writer, this.connectionId);
    }

    private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
        this.fta = new FrameTupleAccessor(this.recordDesc);
        this.inputSideHandler = this.feedRuntime.getInputHandler();
        this.inputSideHandler.setCoreOperator((IFrameWriter)this.coreOperator);
        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(this.ctx, this.connectionId.getFeedId(), this.writer, this.runtimeType, this.partition, new FrameTupleAccessor(this.recordDesc), this.feedManager);
        this.coreOperator.setOutputFrameWriter(0, (IFrameWriter)distributeWriter, this.recordDesc);
        distributeWriter.subscribeFeed(this.policyEnforcer.getFeedPolicyAccessor(), this.writer, this.connectionId);
        this.inputSideHandler.reset(this.nPartitions);
        this.feedRuntime.setMode(IFeedRuntime.Mode.PROCESS);
    }

    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
        try {
            this.inputSideHandler.nextFrame(buffer);
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new HyracksDataException((Throwable)e);
        }
    }

    public void fail() throws HyracksDataException {
        if (LOGGER.isLoggable(Level.WARNING)) {
            LOGGER.warning("Core Op:" + this.coreOperator.getDisplayName() + " fail ");
        }
        this.feedRuntime.setMode(IFeedRuntime.Mode.FAIL);
        this.coreOperator.fail();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws HyracksDataException {
        boolean stalled = this.inputSideHandler.getMode().equals((Object)IFeedRuntime.Mode.STALL);
        boolean end = this.inputSideHandler.getMode().equals((Object)IFeedRuntime.Mode.END);
        try {
            if (this.inputSideHandler != null) {
                if (!stalled && !end) {
                    this.inputSideHandler.nextFrame(null);
                    while (!this.inputSideHandler.isFinished()) {
                        AbstractUnaryInputUnaryOutputOperatorNodePushable abstractUnaryInputUnaryOutputOperatorNodePushable = this.coreOperator;
                        synchronized (abstractUnaryInputUnaryOutputOperatorNodePushable) {
                            this.coreOperator.wait();
                        }
                    }
                } else {
                    this.inputSideHandler.setFinished(true);
                }
            }
            this.coreOperator.close();
            System.out.println("CLOSED " + this.coreOperator + " STALLED ?" + stalled + " ENDED " + end);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (!stalled) {
                this.deregister();
                System.out.println("DEREGISTERING " + this.feedRuntime.getRuntimeId());
            } else {
                System.out.println("NOT DEREGISTERING " + this.feedRuntime.getRuntimeId());
            }
            if (this.inputSideHandler != null) {
                this.inputSideHandler.close();
            }
            if (LOGGER.isLoggable(Level.INFO)) {
                LOGGER.info("Ending Operator  " + this.feedRuntime.getRuntimeId());
            }
        }
    }

    private void deregister() {
        if (this.feedRuntime != null) {
            SubscribableFeedRuntimeId runtimeId = (SubscribableFeedRuntimeId)this.feedRuntime.getRuntimeId();
            this.feedManager.getFeedSubscriptionManager().deregisterFeedSubscribableRuntime(runtimeId);
            this.feedManager.getFeedConnectionManager().deRegisterFeedRuntime(this.connectionId, this.feedRuntime.getRuntimeId());
        }
    }
}

