/*
 * Decompiled with CFR 0.152.
 */
package org.apache.asterix.metadata.feeds;

import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.feeds.CollectionRuntime;
import org.apache.asterix.common.feeds.DistributeFeedFrameWriter;
import org.apache.asterix.common.feeds.FeedCollectRuntimeInputHandler;
import org.apache.asterix.common.feeds.FeedConnectionId;
import org.apache.asterix.common.feeds.FeedFrameCollector;
import org.apache.asterix.common.feeds.FeedId;
import org.apache.asterix.common.feeds.FeedRuntime;
import org.apache.asterix.common.feeds.FeedRuntimeId;
import org.apache.asterix.common.feeds.FeedRuntimeInputHandler;
import org.apache.asterix.common.feeds.FeedRuntimeManager;
import org.apache.asterix.common.feeds.FeedTupleCommitResponseMessage;
import org.apache.asterix.common.feeds.IngestionRuntime;
import org.apache.asterix.common.feeds.IntakePartitionStatistics;
import org.apache.asterix.common.feeds.MonitoredBufferTimerTasks;
import org.apache.asterix.common.feeds.StorageSideMonitoredBuffer;
import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager;
import org.apache.asterix.common.feeds.api.IFeedManager;
import org.apache.asterix.common.feeds.api.IFeedMessage;
import org.apache.asterix.common.feeds.api.IFeedRuntime;
import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
import org.apache.asterix.common.feeds.message.EndFeedMessage;
import org.apache.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
import org.apache.asterix.metadata.feeds.PrepareStallMessage;
import org.apache.asterix.metadata.feeds.TerminateDataFlowMessage;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;

public class FeedMessageOperatorNodePushable
extends AbstractUnaryOutputSourceOperatorNodePushable {
    private static final Logger LOGGER = Logger.getLogger(FeedMessageOperatorNodePushable.class.getName());
    private final FeedConnectionId connectionId;
    private final IFeedMessage message;
    private final IFeedManager feedManager;
    private final int partition;

    public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId connectionId, IFeedMessage feedMessage, int partition, int nPartitions) {
        this.connectionId = connectionId;
        this.message = feedMessage;
        this.partition = partition;
        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext)ctx.getJobletContext().getApplicationContext().getApplicationObject();
        this.feedManager = runtimeCtx.getFeedManager();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void initialize() throws HyracksDataException {
        try {
            this.writer.open();
            switch (this.message.getMessageType()) {
                case END: {
                    EndFeedMessage endFeedMessage = (EndFeedMessage)this.message;
                    switch (endFeedMessage.getEndMessageType()) {
                        case DISCONNECT_FEED: {
                            this.hanldeDisconnectFeedTypeMessage(endFeedMessage);
                            return;
                        }
                        case DISCONTINUE_SOURCE: {
                            this.handleDiscontinueFeedTypeMessage(endFeedMessage);
                            return;
                        }
                    }
                    return;
                }
                case PREPARE_STALL: {
                    this.handlePrepareStallMessage((PrepareStallMessage)this.message);
                    return;
                }
                case TERMINATE_FLOW: {
                    FeedConnectionId connectionId = ((TerminateDataFlowMessage)this.message).getConnectionId();
                    this.handleTerminateFlowMessage(connectionId);
                    return;
                }
                case COMMIT_ACK_RESPONSE: {
                    this.handleFeedTupleCommitResponseMessage((FeedTupleCommitResponseMessage)this.message);
                    return;
                }
                case THROTTLING_ENABLED: {
                    this.handleThrottlingEnabledMessage((ThrottlingEnabledFeedMessage)this.message);
                    return;
                }
            }
            return;
        }
        catch (Exception e) {
            throw new HyracksDataException((Throwable)e);
        }
        finally {
            this.writer.close();
        }
    }

    private void handleThrottlingEnabledMessage(ThrottlingEnabledFeedMessage throttlingMessage) {
        FeedConnectionId connectionId = throttlingMessage.getConnectionId();
        FeedRuntimeManager runtimeManager = this.feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
        Set runtimes = runtimeManager.getFeedRuntimes();
        for (FeedRuntimeId runtimeId : runtimes) {
            if (!runtimeId.getFeedRuntimeType().equals((Object)IFeedRuntime.FeedRuntimeType.STORE)) continue;
            FeedRuntime storeRuntime = runtimeManager.getFeedRuntime(runtimeId);
            ((StorageSideMonitoredBuffer)storeRuntime.getInputHandler().getmBuffer()).setAcking(false);
            if (!LOGGER.isLoggable(Level.INFO)) continue;
            LOGGER.info("Acking Disabled in view of throttling that has been activted upfron in the pipeline " + connectionId);
        }
    }

    private void handleFeedTupleCommitResponseMessage(FeedTupleCommitResponseMessage commitResponseMessage) {
        IIntakeProgressTracker tracker;
        FeedConnectionId connectionId = commitResponseMessage.getConnectionId();
        FeedRuntimeManager runtimeManager = this.feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
        Set runtimes = runtimeManager.getFeedRuntimes();
        for (FeedRuntimeId runtimeId : runtimes) {
            FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
            switch (runtimeId.getFeedRuntimeType()) {
                case COLLECT: {
                    FeedCollectRuntimeInputHandler inputHandler = (FeedCollectRuntimeInputHandler)runtime.getInputHandler();
                    int maxBasePersisted = commitResponseMessage.getMaxWindowAcked();
                    inputHandler.dropTill(IntakePartitionStatistics.ACK_WINDOW_SIZE * (maxBasePersisted + 1));
                    break;
                }
                case STORE: {
                    MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask sTask = runtime.getInputHandler().getmBuffer().getStorageTimeTrackingRateTask();
                    sTask.receiveCommitAckResponse(commitResponseMessage);
                }
            }
        }
        commitResponseMessage.getIntakePartition();
        SubscribableFeedRuntimeId sid = new SubscribableFeedRuntimeId(connectionId.getFeedId(), IFeedRuntime.FeedRuntimeType.INTAKE, this.partition);
        IngestionRuntime ingestionRuntime = (IngestionRuntime)this.feedManager.getFeedSubscriptionManager().getSubscribableRuntime(sid);
        if (ingestionRuntime != null && (tracker = ingestionRuntime.getAdapterRuntimeManager().getProgressTracker()) != null) {
            tracker.notifyIngestedTupleTimestamp(System.currentTimeMillis());
        }
    }

    private void handleTerminateFlowMessage(FeedConnectionId connectionId) throws HyracksDataException {
        FeedRuntimeManager runtimeManager = this.feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
        Set feedRuntimes = runtimeManager.getFeedRuntimes();
        boolean found = false;
        for (FeedRuntimeId runtimeId : feedRuntimes) {
            FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
            if (!runtime.getRuntimeId().getRuntimeType().equals((Object)IFeedRuntime.FeedRuntimeType.COLLECT)) continue;
            ((CollectionRuntime)runtime).getFrameCollector().setState(FeedFrameCollector.State.HANDOVER);
            found = true;
            if (!LOGGER.isLoggable(Level.INFO)) continue;
            LOGGER.info("Switched " + runtime + " to Hand Over stage");
        }
        if (!found) {
            throw new HyracksDataException("COLLECT Runtime  not found!");
        }
    }

    private void handlePrepareStallMessage(PrepareStallMessage prepareStallMessage) throws HyracksDataException {
        FeedConnectionId connectionId = prepareStallMessage.getConnectionId();
        int computePartitionsRetainLimit = prepareStallMessage.getComputePartitionsRetainLimit();
        FeedRuntimeManager runtimeManager = this.feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
        Set feedRuntimes = runtimeManager.getFeedRuntimes();
        block3: for (FeedRuntimeId runtimeId : feedRuntimes) {
            FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
            switch (runtimeId.getFeedRuntimeType()) {
                case COMPUTE: {
                    IFeedRuntime.Mode requiredMode = runtimeId.getPartition() <= computePartitionsRetainLimit ? IFeedRuntime.Mode.STALL : IFeedRuntime.Mode.END;
                    runtime.setMode(requiredMode);
                    continue block3;
                }
            }
            runtime.setMode(IFeedRuntime.Mode.STALL);
        }
    }

    private void handleDiscontinueFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
        FeedId sourceFeedId = endFeedMessage.getSourceFeedId();
        SubscribableFeedRuntimeId subscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId, IFeedRuntime.FeedRuntimeType.INTAKE, this.partition);
        ISubscribableRuntime feedRuntime = this.feedManager.getFeedSubscriptionManager().getSubscribableRuntime(subscribableRuntimeId);
        IAdapterRuntimeManager adapterRuntimeManager = ((IngestionRuntime)feedRuntime).getAdapterRuntimeManager();
        adapterRuntimeManager.stop();
        if (LOGGER.isLoggable(Level.INFO)) {
            LOGGER.info("Stopped Adapter " + adapterRuntimeManager);
        }
    }

    private void hanldeDisconnectFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
        if (LOGGER.isLoggable(Level.INFO)) {
            LOGGER.info("Ending feed:" + endFeedMessage.getFeedConnectionId());
        }
        FeedRuntimeId runtimeId = null;
        IFeedRuntime.FeedRuntimeType subscribableRuntimeType = ((EndFeedMessage)this.message).getSourceRuntimeType();
        if (endFeedMessage.isCompleteDisconnection()) {
            IFeedRuntime.FeedRuntimeType runtimeType = null;
            switch (subscribableRuntimeType) {
                case INTAKE: {
                    runtimeType = IFeedRuntime.FeedRuntimeType.COLLECT;
                    break;
                }
                case COMPUTE: {
                    runtimeType = IFeedRuntime.FeedRuntimeType.COMPUTE_COLLECT;
                    break;
                }
                default: {
                    throw new IllegalStateException("Invalid subscribable runtime type " + subscribableRuntimeType);
                }
            }
            runtimeId = new FeedRuntimeId(runtimeType, this.partition, "N/A");
            CollectionRuntime feedRuntime = (CollectionRuntime)this.feedManager.getFeedConnectionManager().getFeedRuntime(this.connectionId, runtimeId);
            feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
            if (LOGGER.isLoggable(Level.INFO)) {
                LOGGER.info("Complete Unsubscription of " + endFeedMessage.getFeedConnectionId());
            }
        } else {
            block4 : switch (subscribableRuntimeType) {
                case INTAKE: {
                    throw new IllegalStateException("Illegal State, invalid runtime type  " + subscribableRuntimeType);
                }
                case COMPUTE: {
                    SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(this.connectionId.getFeedId(), IFeedRuntime.FeedRuntimeType.COMPUTE, this.partition);
                    ISubscribableRuntime feedRuntime = this.feedManager.getFeedSubscriptionManager().getSubscribableRuntime(feedSubscribableRuntimeId);
                    DistributeFeedFrameWriter dWriter = feedRuntime.getFeedFrameWriter();
                    Map registeredCollectors = dWriter.getRegisteredReaders();
                    FeedRuntimeInputHandler unsubscribingWriter = null;
                    for (Map.Entry entry : registeredCollectors.entrySet()) {
                        IFrameWriter frameWriter = (IFrameWriter)entry.getKey();
                        FeedRuntimeInputHandler feedFrameWriter = (FeedRuntimeInputHandler)frameWriter;
                        if (!feedFrameWriter.getConnectionId().equals((Object)endFeedMessage.getFeedConnectionId())) continue;
                        unsubscribingWriter = feedFrameWriter;
                        dWriter.unsubscribeFeed((IFrameWriter)unsubscribingWriter);
                        if (!LOGGER.isLoggable(Level.INFO)) break block4;
                        LOGGER.info("Partial Unsubscription of " + unsubscribingWriter);
                        break block4;
                    }
                    break;
                }
            }
        }
        if (LOGGER.isLoggable(Level.INFO)) {
            LOGGER.info("Unsubscribed from feed :" + this.connectionId);
        }
    }
}

